1from datetime import datetime, UTC
2from typing import Optional, Dict, Any, List, Union, get_args
3from uuid import uuid4, UUID
4
5from pydantic import Field, BaseModel, model_validator, ConfigDict
6from pydantic.types import UUID4, PositiveInt
7
8from colander_data_converter.base.common import (
9 TlpPapLevel,
10 ObjectReference,
11 Singleton,
12 LRUDict,
13)
14from colander_data_converter.base.models import CommonEntitySuperType, CommonEntitySuperTypes
15from colander_data_converter.base.types.base import CommonEntityType
16
17
[docs]
18class ThreatrRepository(object, metaclass=Singleton):
19 """Singleton repository for managing and storing Entity, Event, and EntityRelation objects.
20
21 This class provides centralized storage and reference management for all model instances,
22 supporting insertion, lookup, and reference resolution/unlinking. Uses the Singleton
23 pattern to ensure a single global repository instance.
24
25 Warning:
26 As a singleton, this repository persists for the entire application lifecycle.
27 Use the ``clear()`` method to reset state when needed.
28 """
29
30 entities: Dict[str, "Entity"]
31 """Dictionary storing Entity objects by their string ID."""
32
33 events: Dict[str, "Event"]
34 """Dictionary storing Event objects by their string ID."""
35
36 relations: Dict[str, "EntityRelation"]
37 """Dictionary storing EntityRelation objects by their string ID."""
38
[docs]
39 def __init__(self):
40 """Initializes the repository with empty dictionaries for events, entities, and relations.
41
42 Note:
43 Due to the Singleton pattern, this method is only called once per application run.
44 """
45 self.events = LRUDict()
46 self.entities = LRUDict()
47 self.relations = LRUDict()
48
[docs]
49 def clear(self):
50 """Clears all stored entities, events, and relations from the repository.
51
52 Caution:
53 This operation cannot be undone and will remove all data from the repository.
54 """
55 self.events.clear()
56 self.relations.clear()
57 self.entities.clear()
58
[docs]
59 def __lshift__(self, other: Union["Entity", "Event", "EntityRelation"]) -> None:
60 """Inserts an object into the appropriate repository dictionary using the left shift operator.
61
62 This method overloads the ``<<`` operator to provide a convenient way to register
63 Entity, Event, and EntityRelation objects in their respective dictionaries.
64 The object's ID is used as the key, converted to string format for consistency.
65
66 Args:
67 other: The object to insert into the repository.
68 """
69 if isinstance(other, Entity):
70 self.entities[str(other.id)] = other
71 elif isinstance(other, EntityRelation):
72 self.relations[str(other.id)] = other
73 elif isinstance(other, Event):
74 self.events[str(other.id)] = other
75
[docs]
76 def __rshift__(self, other: str | UUID4) -> Union["Entity", "Event", "EntityRelation", str, UUID4]:
77 """Retrieves an object by its string or UUID identifier using the right shift operator.
78
79 This method overloads the ``>>`` operator to provide a convenient way to lookup
80 Entity, Event, and EntityRelation objects from their respective dictionaries.
81 The method searches through entities, relations, and events in that order,
82 returning the first match found.
83
84 Args:
85 other: The string or UUID identifier to look up in the repository.
86
87 Returns:
88 The found Entity, Event, or EntityRelation object, or the original
89 identifier if no matching object is found.
90 """
91 _other = str(other)
92 if _other in self.entities:
93 return self.entities[_other]
94 elif _other in self.relations:
95 return self.relations[_other]
96 elif _other in self.events:
97 return self.events[_other]
98 return other
99
[docs]
100 def unlink_references(self):
101 """Unlinks all object references in relations and events by replacing them with UUIDs.
102
103 This method calls ``unlink_references()`` on all stored relations and events to
104 convert object references back to UUID references for serialization purposes.
105
106 Note:
107 This operation modifies the stored objects in-place.
108 """
109 for _, relation in self.relations.items():
110 relation.unlink_references()
111 for _, event in self.events.items():
112 event.unlink_references()
113
[docs]
114 def resolve_references(self, strict=False):
115 """Resolves all UUID references in relations and events to their corresponding objects.
116
117 This method iterates through all stored relations and events in the repository,
118 calling their respective ``resolve_references`` methods to convert UUID references
119 back to actual object instances. This is typically used after deserialization
120 to restore object relationships.
121
122 Args:
123 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
124 If False, unresolved references remain as UUIDs. Defaults to False.
125
126 Raises:
127 ValueError: If strict is True and any UUID reference cannot be resolved
128 to an existing object in the repository.
129
130 Important:
131 Use ``strict=True`` to ensure data integrity when all references must be resolvable.
132 """
133 for _, relation in self.relations.items():
134 relation.resolve_references(strict=strict)
135 for _, event in self.events.items():
136 event.resolve_references(strict=strict)
137
138
[docs]
139class ThreatrType(BaseModel):
140 """Base model for Threatr objects, providing repository registration and reference management.
141
142 This class ensures that all subclasses are automatically registered in the ThreatrRepository
143 and provides methods to unlink and resolve object references for serialization and
144 deserialization workflows.
145
146 Important:
147 All Threatr model classes must inherit from this base class to ensure proper
148 repository integration and reference management.
149 """
150
151 model_config: ConfigDict = ConfigDict(str_strip_whitespace=True, arbitrary_types_allowed=True, from_attributes=True)
152
[docs]
153 def model_post_init(self, __context):
154 """Executes post-initialization logic for the model.
155
156 Ensures the repository registers the current subclass instance automatically
157 after object creation.
158
159 Args:
160 __context: Additional context provided for post-initialization handling.
161
162 Note:
163 This method is called automatically by Pydantic after model initialization.
164 """
165 _ = ThreatrRepository()
166 _ << self
167
168 def _process_reference_fields(self, operation, strict=False):
169 """Helper method to process reference fields for both unlinking and resolving operations.
170
171 Args:
172 operation: The operation to perform, either 'unlink' or 'resolve'.
173 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
174 Only used for 'resolve' operation. Defaults to False.
175
176 Raises:
177 ValueError: If strict is True and a UUID reference cannot be resolved.
178 AttributeError: If the class instance does not have the expected field or attribute.
179 """
180 for field, info in self.__class__.model_fields.items():
181 annotation_args = get_args(info.annotation)
182 if ObjectReference in annotation_args:
183 ref = getattr(self, field)
184 if operation == "unlink" and ref and type(ref) is not UUID:
185 setattr(self, field, ref.id)
186 elif operation == "resolve" and type(ref) is UUID:
187 x = ThreatrRepository() >> ref
188 if strict and isinstance(x, UUID):
189 raise ValueError(f"Unable to resolve UUID reference {x}")
190 setattr(self, field, x)
191 elif List[ObjectReference] in annotation_args:
192 refs = getattr(self, field)
193 new_refs = []
194 _update = False
195 for ref in refs:
196 if operation == "unlink" and ref and type(ref) is not UUID:
197 new_refs.append(ref.id)
198 _update = True
199 elif operation == "resolve" and type(ref) is UUID:
200 x = ThreatrRepository() >> ref
201 if strict and isinstance(x, UUID):
202 raise ValueError(f"Unable to resolve UUID reference {x}")
203 new_refs.append(x)
204 _update = True
205 if _update:
206 setattr(self, field, new_refs)
207
[docs]
208 def unlink_references(self):
209 """Unlinks object references by replacing them with their respective UUIDs.
210
211 This method updates model fields annotated as ``ObjectReference`` or ``List[ObjectReference]``
212 by replacing object references with their UUIDs for serialization purposes.
213
214 Note:
215 This operation modifies the object in-place and is typically used before serialization.
216
217 Raises:
218 AttributeError: If the class instance does not have the expected field or attribute.
219 """
220 self._process_reference_fields("unlink")
221
[docs]
222 def resolve_references(self, strict=False):
223 """Resolves UUID references to their corresponding objects using the ThreatrRepository.
224
225 Fields annotated with ``ObjectReference`` or ``List[ObjectReference]`` are processed
226 to fetch and replace their UUID references with actual object instances.
227
228 Args:
229 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
230 If False, unresolved references remain as UUIDs. Defaults to False.
231
232 Raises:
233 ValueError: If strict is True and a UUID reference cannot be resolved.
234
235 Important:
236 Use ``strict=True`` to ensure all references are valid and resolvable.
237 """
238 self._process_reference_fields("resolve", strict)
239
240
[docs]
241class Entity(ThreatrType):
242 """Represents an entity in the Threatr data model.
243
244 Entities are the primary data objects in Threatr, representing observables,
245 indicators, or other threat intelligence artifacts with associated metadata
246 and classification levels.
247 """
248
249 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
250 """The unique identifier for the entity."""
251
252 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
253 """The timestamp when the entity was created."""
254
255 updated_at: datetime = Field(default=datetime.now(UTC))
256 """The timestamp when the entity was last updated."""
257
258 name: str = Field(..., min_length=1, max_length=512)
259 """The name of the entity."""
260
261 type: CommonEntityType
262 """The specific type of the entity."""
263
264 super_type: CommonEntitySuperType
265 """The super type classification of the entity."""
266
267 description: str | None = None
268 """Optional description of the entity."""
269
270 pap: TlpPapLevel = TlpPapLevel.WHITE
271 """The PAP (Permissible Actions Protocol) level for the entity."""
272
273 source_url: str | None = None
274 """Optional source URL for the entity."""
275
276 tlp: TlpPapLevel = TlpPapLevel.WHITE
277 """The TLP (Traffic Light Protocol) level for the entity."""
278
279 attributes: Optional[Dict[str, str | None]] = None
280 """Dictionary of additional attributes."""
281
282
[docs]
283class EntityRelation(ThreatrType):
284 """Represents a relation between two entities in the Threatr data model.
285
286 EntityRelations define directed relationships between entities, supporting
287 complex threat intelligence graphs and entity associations.
288 """
289
290 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
291 """The unique identifier for the entity relation."""
292
293 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
294 """The timestamp when the entity relation was created."""
295
296 updated_at: datetime = Field(default=datetime.now(UTC))
297 """The timestamp when the entity relation was last updated."""
298
299 name: str = Field(..., min_length=1, max_length=512)
300 """The name of the entity relation."""
301
302 description: str | None = None
303 """Optional description of the relation."""
304
305 attributes: Optional[Dict[str, str | None]] = None
306 """Dictionary of additional attributes for the relation."""
307
308 obj_from: Entity | ObjectReference = Field(...)
309 """The source entity or reference in the relation."""
310
311 obj_to: Entity | ObjectReference = Field(...)
312 """The target entity or reference in the relation."""
313
314
[docs]
315class Event(ThreatrType):
316 """Represents an event in the Threatr data model.
317
318 Events capture temporal occurrences related to threat intelligence,
319 tracking when specific activities or observations took place.
320 """
321
322 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
323 """The unique identifier for the event."""
324
325 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
326 """The timestamp when the event was created."""
327
328 updated_at: datetime = Field(default=datetime.now(UTC))
329 """The timestamp when the event was last updated."""
330
331 name: str = Field(..., min_length=1, max_length=512)
332 """The name of the event."""
333
334 description: str | None = None
335 """Optional description of the event."""
336
337 attributes: Optional[Dict[str, str | None]] = None
338 """Dictionary of additional attributes for the event."""
339
340 first_seen: datetime = datetime.now(UTC)
341 """The timestamp when the event was first observed."""
342
343 last_seen: datetime = datetime.now(UTC)
344 """The timestamp when the event was last observed."""
345
346 count: PositiveInt = 1
347 """The number of times this event was observed."""
348
349 type: CommonEntityType
350 """The type of the event."""
351
352 super_type: CommonEntitySuperType = CommonEntitySuperTypes.EVENT
353 """The super type classification of the event."""
354
355 involved_entity: Optional[Entity] | Optional[ObjectReference] = None
356 """Optional entity or reference involved in this event."""
357
358 @model_validator(mode="after")
359 def _check_dates(self) -> Any:
360 """Validates that the first_seen date is before the last_seen date.
361
362 Returns:
363 The validated model instance.
364
365 Raises:
366 ValueError: If first_seen is after last_seen.
367
368 Important:
369 This validation ensures temporal consistency for event data.
370 """
371 if self.first_seen > self.last_seen:
372 raise ValueError("first_seen must be before last_seen")
373 return self
374
375
[docs]
376class ThreatrFeed(ThreatrType):
377 """Represents a feed of Threatr data, including entities, relations, and events.
378
379 ThreatrFeed serves as a container for complete threat intelligence datasets,
380 organizing related entities, their relationships, and associated events into
381 a cohesive data structure.
382 """
383
384 root_entity: Entity
385 """The root entity of the feed, corresponding to the primary requested entity."""
386
387 entities: Optional[List[Entity]] = []
388 """List of entity objects in the feed."""
389
390 relations: Optional[List[EntityRelation]] = []
391 """List of entity relation objects in the feed."""
392
393 events: Optional[List[Event]] = []
394 """List of event objects in the feed."""
395
[docs]
396 @staticmethod
397 def load(
398 raw_object: Dict[str, Union[Entity, Event, EntityRelation]],
399 strict: bool = False,
400 ) -> "ThreatrFeed":
401 """Loads a ThreatrFeed from a raw object dictionary, resolving references.
402
403 Args:
404 raw_object: The raw data to validate and load.
405 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
406 If False, unresolved references remain as UUIDs.
407
408 Returns:
409 The loaded and reference-resolved feed.
410
411 Important:
412 Use ``strict=True`` to ensure all references in the feed are valid and resolvable.
413 """
414 ThreatrRepository().clear()
415 feed = ThreatrFeed.model_validate(raw_object)
416 feed.resolve_references(strict=strict)
417 return feed
418
[docs]
419 def resolve_references(self, strict=False):
420 """Resolves references within entities, relations, and events.
421
422 Iterates over each entity, relation, and event within the respective collections,
423 calling their ``resolve_references`` method to update them with any referenced data.
424
425 Args:
426 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
427 If False, unresolved references remain as UUIDs.
428
429 Note:
430 This method synchronizes internal state with external dependencies after loading.
431 """
432 for entity in self.entities:
433 entity.resolve_references(strict=strict)
434 for event in self.events:
435 event.resolve_references(strict=strict)
436 for relation in self.relations:
437 relation.resolve_references(strict=strict)
438
[docs]
439 def unlink_references(self) -> None:
440 """Unlinks references from all entities, relations, and events within the feed.
441
442 This method iterates through each entity, event, and relation, invoking their
443 ``unlink_references()`` methods to replace object references with UUIDs.
444
445 Note:
446 This operation is useful for breaking dependencies or preparing data for serialization.
447 """
448 for entity in self.entities:
449 entity.unlink_references()
450 for event in self.events:
451 event.unlink_references()
452 for relation in self.relations:
453 relation.unlink_references()