1from datetime import datetime, UTC
2from typing import Optional, Dict, Any, List, Union, get_args
3from uuid import uuid4, UUID
4
5from pydantic import Field, BaseModel, model_validator, ConfigDict
6from pydantic.types import UUID4, PositiveInt
7
8from colander_data_converter.base.common import (
9 TlpPapLevel,
10 ObjectReference,
11 Singleton,
12)
13from colander_data_converter.base.models import CommonEntitySuperType, CommonEntitySuperTypes
14from colander_data_converter.base.types.base import CommonEntityType
15
16
[docs]
17class ThreatrRepository(object, metaclass=Singleton):
18 """Singleton repository for managing and storing Entity, Event, and EntityRelation objects.
19
20 This class provides centralized storage and reference management for all model instances,
21 supporting insertion, lookup, and reference resolution/unlinking. Uses the Singleton
22 pattern to ensure a single global repository instance.
23
24 Warning:
25 As a singleton, this repository persists for the entire application lifecycle.
26 Use the ``clear()`` method to reset state when needed.
27 """
28
29 entities: Dict[str, "Entity"]
30 """Dictionary storing Entity objects by their string ID."""
31
32 events: Dict[str, "Event"]
33 """Dictionary storing Event objects by their string ID."""
34
35 relations: Dict[str, "EntityRelation"]
36 """Dictionary storing EntityRelation objects by their string ID."""
37
[docs]
38 def __init__(self):
39 """Initializes the repository with empty dictionaries for events, entities, and relations.
40
41 Note:
42 Due to the Singleton pattern, this method is only called once per application run.
43 """
44 self.events = {}
45 self.entities = {}
46 self.relations = {}
47
[docs]
48 def clear(self):
49 """Clears all stored entities, events, and relations from the repository.
50
51 Caution:
52 This operation cannot be undone and will remove all data from the repository.
53 """
54 self.events.clear()
55 self.relations.clear()
56 self.entities.clear()
57
[docs]
58 def __lshift__(self, other: Union["Entity", "Event", "EntityRelation"]) -> None:
59 """Inserts an object into the appropriate repository dictionary using the left shift operator.
60
61 This method overloads the ``<<`` operator to provide a convenient way to register
62 Entity, Event, and EntityRelation objects in their respective dictionaries.
63 The object's ID is used as the key, converted to string format for consistency.
64
65 Args:
66 other: The object to insert into the repository.
67 """
68 if isinstance(other, Entity):
69 self.entities[str(other.id)] = other
70 elif isinstance(other, EntityRelation):
71 self.relations[str(other.id)] = other
72 elif isinstance(other, Event):
73 self.events[str(other.id)] = other
74
[docs]
75 def __rshift__(self, other: str | UUID4) -> Union["Entity", "Event", "EntityRelation", str, UUID4]:
76 """Retrieves an object by its string or UUID identifier using the right shift operator.
77
78 This method overloads the ``>>`` operator to provide a convenient way to lookup
79 Entity, Event, and EntityRelation objects from their respective dictionaries.
80 The method searches through entities, relations, and events in that order,
81 returning the first match found.
82
83 Args:
84 other: The string or UUID identifier to look up in the repository.
85
86 Returns:
87 The found Entity, Event, or EntityRelation object, or the original
88 identifier if no matching object is found.
89 """
90 _other = str(other)
91 if _other in self.entities:
92 return self.entities[_other]
93 elif _other in self.relations:
94 return self.relations[_other]
95 elif _other in self.events:
96 return self.events[_other]
97 return other
98
[docs]
99 def unlink_references(self):
100 """Unlinks all object references in relations and events by replacing them with UUIDs.
101
102 This method calls ``unlink_references()`` on all stored relations and events to
103 convert object references back to UUID references for serialization purposes.
104
105 Note:
106 This operation modifies the stored objects in-place.
107 """
108 for _, relation in self.relations.items():
109 relation.unlink_references()
110 for _, event in self.events.items():
111 event.unlink_references()
112
[docs]
113 def resolve_references(self, strict=False):
114 """Resolves all UUID references in relations and events to their corresponding objects.
115
116 This method iterates through all stored relations and events in the repository,
117 calling their respective ``resolve_references`` methods to convert UUID references
118 back to actual object instances. This is typically used after deserialization
119 to restore object relationships.
120
121 Args:
122 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
123 If False, unresolved references remain as UUIDs. Defaults to False.
124
125 Raises:
126 ValueError: If strict is True and any UUID reference cannot be resolved
127 to an existing object in the repository.
128
129 Important:
130 Use ``strict=True`` to ensure data integrity when all references must be resolvable.
131 """
132 for _, relation in self.relations.items():
133 relation.resolve_references(strict=strict)
134 for _, event in self.events.items():
135 event.resolve_references(strict=strict)
136
137
[docs]
138class ThreatrType(BaseModel):
139 """Base model for Threatr objects, providing repository registration and reference management.
140
141 This class ensures that all subclasses are automatically registered in the ThreatrRepository
142 and provides methods to unlink and resolve object references for serialization and
143 deserialization workflows.
144
145 Important:
146 All Threatr model classes must inherit from this base class to ensure proper
147 repository integration and reference management.
148 """
149
150 model_config = ConfigDict(
151 str_strip_whitespace=True,
152 arbitrary_types_allowed=True,
153 )
154
[docs]
155 def model_post_init(self, __context):
156 """Executes post-initialization logic for the model.
157
158 Ensures the repository registers the current subclass instance automatically
159 after object creation.
160
161 Args:
162 __context: Additional context provided for post-initialization handling.
163
164 Note:
165 This method is called automatically by Pydantic after model initialization.
166 """
167 _ = ThreatrRepository()
168 _ << self
169
170 def _process_reference_fields(self, operation, strict=False):
171 """Helper method to process reference fields for both unlinking and resolving operations.
172
173 Args:
174 operation: The operation to perform, either 'unlink' or 'resolve'.
175 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
176 Only used for 'resolve' operation. Defaults to False.
177
178 Raises:
179 ValueError: If strict is True and a UUID reference cannot be resolved.
180 AttributeError: If the class instance does not have the expected field or attribute.
181 """
182 for field, info in self.__class__.model_fields.items():
183 annotation_args = get_args(info.annotation)
184 if ObjectReference in annotation_args:
185 ref = getattr(self, field)
186 if operation == "unlink" and ref and type(ref) is not UUID:
187 setattr(self, field, ref.id)
188 elif operation == "resolve" and type(ref) is UUID:
189 x = ThreatrRepository() >> ref
190 if strict and isinstance(x, UUID):
191 raise ValueError(f"Unable to resolve UUID reference {x}")
192 setattr(self, field, x)
193 elif List[ObjectReference] in annotation_args:
194 refs = getattr(self, field)
195 new_refs = []
196 _update = False
197 for ref in refs:
198 if operation == "unlink" and ref and type(ref) is not UUID:
199 new_refs.append(ref.id)
200 _update = True
201 elif operation == "resolve" and type(ref) is UUID:
202 x = ThreatrRepository() >> ref
203 if strict and isinstance(x, UUID):
204 raise ValueError(f"Unable to resolve UUID reference {x}")
205 new_refs.append(x)
206 _update = True
207 if _update:
208 setattr(self, field, new_refs)
209
[docs]
210 def unlink_references(self):
211 """Unlinks object references by replacing them with their respective UUIDs.
212
213 This method updates model fields annotated as ``ObjectReference`` or ``List[ObjectReference]``
214 by replacing object references with their UUIDs for serialization purposes.
215
216 Note:
217 This operation modifies the object in-place and is typically used before serialization.
218
219 Raises:
220 AttributeError: If the class instance does not have the expected field or attribute.
221 """
222 self._process_reference_fields("unlink")
223
[docs]
224 def resolve_references(self, strict=False):
225 """Resolves UUID references to their corresponding objects using the ThreatrRepository.
226
227 Fields annotated with ``ObjectReference`` or ``List[ObjectReference]`` are processed
228 to fetch and replace their UUID references with actual object instances.
229
230 Args:
231 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
232 If False, unresolved references remain as UUIDs. Defaults to False.
233
234 Raises:
235 ValueError: If strict is True and a UUID reference cannot be resolved.
236
237 Important:
238 Use ``strict=True`` to ensure all references are valid and resolvable.
239 """
240 self._process_reference_fields("resolve", strict)
241
242
[docs]
243class Entity(ThreatrType):
244 """Represents an entity in the Threatr data model.
245
246 Entities are the primary data objects in Threatr, representing observables,
247 indicators, or other threat intelligence artifacts with associated metadata
248 and classification levels.
249 """
250
251 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
252 """The unique identifier for the entity."""
253
254 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
255 """The timestamp when the entity was created."""
256
257 updated_at: datetime = Field(default=datetime.now(UTC))
258 """The timestamp when the entity was last updated."""
259
260 name: str = Field(..., min_length=1, max_length=512)
261 """The name of the entity."""
262
263 type: CommonEntityType
264 """The specific type of the entity."""
265
266 super_type: CommonEntitySuperType
267 """The super type classification of the entity."""
268
269 description: str | None = None
270 """Optional description of the entity."""
271
272 pap: TlpPapLevel = TlpPapLevel.WHITE
273 """The PAP (Permissible Actions Protocol) level for the entity."""
274
275 source_url: str | None = None
276 """Optional source URL for the entity."""
277
278 tlp: TlpPapLevel = TlpPapLevel.WHITE
279 """The TLP (Traffic Light Protocol) level for the entity."""
280
281 attributes: Optional[Dict[str, str | None]] = None
282 """Dictionary of additional attributes."""
283
284
[docs]
285class EntityRelation(ThreatrType):
286 """Represents a relation between two entities in the Threatr data model.
287
288 EntityRelations define directed relationships between entities, supporting
289 complex threat intelligence graphs and entity associations.
290 """
291
292 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
293 """The unique identifier for the entity relation."""
294
295 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
296 """The timestamp when the entity relation was created."""
297
298 updated_at: datetime = Field(default=datetime.now(UTC))
299 """The timestamp when the entity relation was last updated."""
300
301 name: str = Field(..., min_length=1, max_length=512)
302 """The name of the entity relation."""
303
304 description: str | None = None
305 """Optional description of the relation."""
306
307 attributes: Optional[Dict[str, str | None]] = None
308 """Dictionary of additional attributes for the relation."""
309
310 obj_from: Entity | ObjectReference = Field(...)
311 """The source entity or reference in the relation."""
312
313 obj_to: Entity | ObjectReference = Field(...)
314 """The target entity or reference in the relation."""
315
316
[docs]
317class Event(ThreatrType):
318 """Represents an event in the Threatr data model.
319
320 Events capture temporal occurrences related to threat intelligence,
321 tracking when specific activities or observations took place.
322 """
323
324 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
325 """The unique identifier for the event."""
326
327 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
328 """The timestamp when the event was created."""
329
330 updated_at: datetime = Field(default=datetime.now(UTC))
331 """The timestamp when the event was last updated."""
332
333 name: str = Field(..., min_length=1, max_length=512)
334 """The name of the event."""
335
336 description: str | None = None
337 """Optional description of the event."""
338
339 attributes: Optional[Dict[str, str | None]] = None
340 """Dictionary of additional attributes for the event."""
341
342 first_seen: datetime = datetime.now(UTC)
343 """The timestamp when the event was first observed."""
344
345 last_seen: datetime = datetime.now(UTC)
346 """The timestamp when the event was last observed."""
347
348 count: PositiveInt = 1
349 """The number of times this event was observed."""
350
351 type: CommonEntityType
352 """The type of the event."""
353
354 super_type: CommonEntitySuperType = CommonEntitySuperTypes.EVENT
355 """The super type classification of the event."""
356
357 involved_entity: Optional[Entity] | Optional[ObjectReference] = None
358 """Optional entity or reference involved in this event."""
359
360 @model_validator(mode="after")
361 def _check_dates(self) -> Any:
362 """Validates that the first_seen date is before the last_seen date.
363
364 Returns:
365 The validated model instance.
366
367 Raises:
368 ValueError: If first_seen is after last_seen.
369
370 Important:
371 This validation ensures temporal consistency for event data.
372 """
373 if self.first_seen > self.last_seen:
374 raise ValueError("first_seen must be before last_seen")
375 return self
376
377
[docs]
378class ThreatrFeed(ThreatrType):
379 """Represents a feed of Threatr data, including entities, relations, and events.
380
381 ThreatrFeed serves as a container for complete threat intelligence datasets,
382 organizing related entities, their relationships, and associated events into
383 a cohesive data structure.
384 """
385
386 root_entity: Entity
387 """The root entity of the feed, corresponding to the primary requested entity."""
388
389 entities: Optional[List[Entity]] = []
390 """List of entity objects in the feed."""
391
392 relations: Optional[List[EntityRelation]] = []
393 """List of entity relation objects in the feed."""
394
395 events: Optional[List[Event]] = []
396 """List of event objects in the feed."""
397
[docs]
398 @staticmethod
399 def load(
400 raw_object: Dict[str, Union[Entity, Event, EntityRelation]],
401 strict: bool = False,
402 ) -> "ThreatrFeed":
403 """Loads a ThreatrFeed from a raw object dictionary, resolving references.
404
405 Args:
406 raw_object: The raw data to validate and load.
407 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
408 If False, unresolved references remain as UUIDs.
409
410 Returns:
411 The loaded and reference-resolved feed.
412
413 Important:
414 Use ``strict=True`` to ensure all references in the feed are valid and resolvable.
415 """
416 feed = ThreatrFeed.model_validate(raw_object)
417 feed.resolve_references(strict=strict)
418 return feed
419
[docs]
420 def resolve_references(self, strict=False):
421 """Resolves references within entities, relations, and events.
422
423 Iterates over each entity, relation, and event within the respective collections,
424 calling their ``resolve_references`` method to update them with any referenced data.
425
426 Args:
427 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
428 If False, unresolved references remain as UUIDs.
429
430 Note:
431 This method synchronizes internal state with external dependencies after loading.
432 """
433 for entity in self.entities:
434 entity.resolve_references(strict=strict)
435 for event in self.events:
436 event.resolve_references(strict=strict)
437 for relation in self.relations:
438 relation.resolve_references(strict=strict)
439
[docs]
440 def unlink_references(self) -> None:
441 """Unlinks references from all entities, relations, and events within the feed.
442
443 This method iterates through each entity, event, and relation, invoking their
444 ``unlink_references()`` methods to replace object references with UUIDs.
445
446 Note:
447 This operation is useful for breaking dependencies or preparing data for serialization.
448 """
449 for entity in self.entities:
450 entity.unlink_references()
451 for event in self.events:
452 event.unlink_references()
453 for relation in self.relations:
454 relation.unlink_references()