1import abc
2import enum
3from datetime import datetime, UTC
4from typing import List, Dict, Optional, Union, Annotated, Literal, get_args, Any
5from uuid import uuid4, UUID
6
7from pydantic import (
8 PositiveInt,
9 NonNegativeInt,
10 UUID4,
11 BaseModel,
12 AnyUrl,
13 computed_field,
14 model_validator,
15 ConfigDict,
16 Field,
17)
18
19from colander_data_converter.base.common import (
20 ObjectReference,
21 TlpPapLevel,
22 Singleton,
23)
24from colander_data_converter.base.types.actor import ActorType, ActorTypes
25from colander_data_converter.base.types.artifact import ArtifactType, ArtifactTypes
26from colander_data_converter.base.types.base import EntityType_T
27from colander_data_converter.base.types.data_fragment import DataFragmentType, DataFragmentTypes
28from colander_data_converter.base.types.detection_rule import DetectionRuleType, DetectionRuleTypes
29from colander_data_converter.base.types.device import DeviceType, DeviceTypes
30from colander_data_converter.base.types.event import EventType, EventTypes
31from colander_data_converter.base.types.observable import ObservableType, ObservableTypes
32from colander_data_converter.base.types.threat import ThreatType, ThreatTypes
33
34resource_package = __name__
35
36
[docs]
37def get_id(obj: Any) -> Optional[UUID4]:
38 """
39 Extracts a UUID4 identifier from the given object.
40
41 Args:
42 obj: The object to extract the UUID from. Can be a string, UUID, or an object with an 'id' attribute.
43
44 Returns:
45 Optional[UUID4]: The extracted UUID4 if available, otherwise None.
46 """
47 if not obj:
48 return None
49
50 if isinstance(obj, str):
51 try:
52 return UUID(obj, version=4)
53 except Exception:
54 return None
55 elif isinstance(obj, UUID):
56 return obj
57 elif (obj_id := getattr(obj, "id", None)) is not None:
58 return get_id(obj_id)
59
60 return None
61
62
63# Annotated union type representing all possible entity definitions in the model.
64# This type is used for fields that can accept any of the defined entity classes.
65# The Field discriminator 'colander_internal_type' is used for type resolution during (de)serialization.
66EntityTypes = Annotated[
67 Union[
68 "Actor",
69 "Artifact",
70 "DataFragment",
71 "Observable",
72 "DetectionRule",
73 "Device",
74 "Event",
75 "Threat",
76 ],
77 Field(discriminator="colander_internal_type"),
78]
79
80
[docs]
81class ColanderType(BaseModel):
82 """Base class for all Colander model data_types, providing common functionality.
83
84 This class extends Pydantic's BaseModel and is intended to be subclassed by
85 all model entities. It includes methods for linking and unlinking object references,
86 resolving type hints, and extracting subclass information.
87 """
88
89 model_config = ConfigDict(
90 str_strip_whitespace=True,
91 arbitrary_types_allowed=True,
92 )
93
[docs]
94 def model_post_init(self, __context):
95 """Executes post-initialization logic for the model, ensuring the repository
96 registers the current subclass instance.
97
98 Args:
99 __context (Any): Additional context provided for post-initialization handling.
100 """
101 _ = ColanderRepository()
102 _ << self
103
104 def _process_reference_fields(self, operation, strict=False):
105 """Helper method to process reference fields for both unlinking and resolving operations.
106
107 Args:
108 operation (str): The operation to perform, either 'unlink' or 'resolve'.
109 strict (bool, optional): If True, raises a ValueError when a UUID reference cannot be resolved.
110 Only used for 'resolve' operation. Defaults to False.
111
112 Raises:
113 ValueError: If strict is True and a UUID reference cannot be resolved.
114 AttributeError: If the class instance does not have the expected field or attribute.
115 """
116 for field, info in self.__class__.model_fields.items():
117 annotation_args = get_args(info.annotation)
118 if ObjectReference in annotation_args:
119 ref = getattr(self, field)
120 if operation == "unlink" and ref and type(ref) is not UUID:
121 setattr(self, field, ref.id)
122 elif operation == "resolve" and type(ref) is UUID:
123 x = ColanderRepository() >> ref
124 if strict and isinstance(x, UUID):
125 raise ValueError(f"Unable to resolve UUID reference {x}")
126 setattr(self, field, x)
127 elif List[ObjectReference] in annotation_args:
128 refs = getattr(self, field)
129 new_refs = []
130 _update = False
131 for ref in refs:
132 if operation == "unlink" and ref and type(ref) is not UUID:
133 new_refs.append(ref.id)
134 _update = True
135 elif operation == "resolve" and type(ref) is UUID:
136 x = ColanderRepository() >> ref
137 if strict and isinstance(x, UUID):
138 raise ValueError(f"Unable to resolve UUID reference {x}")
139 new_refs.append(x)
140 _update = True
141 if _update:
142 setattr(self, field, new_refs)
143
[docs]
144 def unlink_references(self):
145 """Unlinks object references by replacing them with their respective UUIDs.
146
147 This method updates the model fields of the class instance where
148 fields annotated as `ObjectReference` or `List[ObjectReference]` exist. It replaces the
149 references (of type objects) with their UUIDs if they exist.
150
151 For fields of type `ObjectReference`, the method retrieves the field's value and replaces
152 it with its `id` (UUID) if the current value is not already a UUID.
153
154 For fields of type `List[ObjectReference]`, the method iterates through the list and
155 replaces each object reference with its `id` (UUID) if the current value is
156 not already a UUID. The field value is updated only if at least one
157 replacement occurs.
158
159 Raises:
160 AttributeError: If the class instance does not have the expected field or attribute.
161 """
162 self._process_reference_fields("unlink")
163
[docs]
164 def resolve_references(self, strict=False):
165 """Resolves references for the fields in the object's model.
166
167 Fields annotated with `ObjectReference` or `List[ObjectReference]` are processed
168 to fetch and replace their UUID references with respective entities using the `Repository`.
169
170 This method updates the object in-place.
171
172 Args:
173 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
174 If False, unresolved references remain as UUIDs.
175
176 Raises:
177 ValueError: If strict is True and a UUID reference cannot be resolved.
178 """
179 self._process_reference_fields("resolve", strict)
180
[docs]
181 def is_fully_resolved(self) -> bool:
182 self.resolve_references()
183
184 for field, info in self.__class__.model_fields.items():
185 annotation_args = get_args(info.annotation)
186 if ObjectReference in annotation_args:
187 ref = getattr(self, field)
188 if isinstance(ref, UUID):
189 return False
190 elif List[ObjectReference] in annotation_args:
191 refs = getattr(self, field)
192 for ref in refs:
193 if isinstance(ref, UUID):
194 return False
195
196 return True
197
[docs]
198 @classmethod
199 def subclasses(cls) -> Dict[str, type["EntityTypes"]]:
200 """Generates a dictionary containing all subclasses of the current class.
201
202 This method collects all the direct subclasses of the current class and maps their
203 names (converted to lowercase) to the class itself. It is primarily useful for
204 organizing and accessing class hierarchies dynamically.
205
206 Returns:
207 Dict[str, type['EntityTypes']]: A dictionary where the keys are the lowercase names of the subclasses, and
208 the values are the subclass data_types themselves.
209 """
210 subclasses = {}
211 for subclass in cls.__subclasses__():
212 subclasses[subclass.__name__.lower()] = subclass
213 return subclasses
214
[docs]
215 @classmethod
216 def resolve_type(cls, content_type: str) -> type["EntityTypes"]:
217 """Resolves a specific type of entity definition based on the provided content type by
218 matching it against the available subclasses of the class. This utility ensures that
219 the given content type is valid and matches one of the registered subclasses.
220
221 Args:
222 content_type (str): A string representing the type of content to be resolved.
223 Must match the name of a subclass (in lowercase) of the current class.
224
225 Returns:
226 type['EntityTypes']: The resolved class type corresponding to the provided content type.
227 """
228 _content_type = content_type.lower()
229 _subclasses = cls.subclasses()
230 assert _content_type in _subclasses
231 return _subclasses[_content_type]
232
[docs]
233 @classmethod
234 def extract_type_hints(cls, obj: dict) -> str:
235 """Extracts type hints from a given dictionary based on specific keys.
236
237 This class method attempts to retrieve type hints from a dictionary using a specific
238 key ("colander_internal_type") or nested keys ("super_type" and its "short_name" value).
239 If the dictionary does not match the expected structure or the keys are not available,
240 a ValueError is raised.
241
242 Args:
243 obj (dict): The dictionary from which type hints need to be extracted.
244
245 Returns:
246 str: A string representing the extracted type hint.
247
248 Raises:
249 ValueError: If the type hint cannot be extracted from the provided dictionary.
250 """
251 try:
252 if "colander_internal_type" in obj:
253 return obj.get("colander_internal_type", "")
254 elif "super_type" in obj:
255 return obj.get("super_type").get("short_name").lower().replace("_", "") # type: ignore[union-attr]
256 except: # nosec
257 pass
258 raise ValueError("Unable to extract type hints.")
259
260 @computed_field
261 def super_type(self) -> "CommonEntitySuperType":
262 return self.get_super_type()
263
[docs]
264 def get_super_type(self) -> "CommonEntitySuperType":
265 return CommonEntitySuperType(
266 **{
267 "name": self.__class__.__name__,
268 "short_name": self.__class__.__name__.upper(),
269 "_class": self.__class__,
270 }
271 )
272
273
[docs]
274class Case(ColanderType):
275 """Case represents a collection or grouping of related entities, artifacts, or events.
276
277 This class is used to organize and manage related data, such as incidents, investigations, or projects.
278
279 Example:
280 >>> case = Case(
281 ... name='Investigation Alpha',
282 ... description='Investigation of suspicious activity'
283 ... )
284 >>> print(case.name)
285 Investigation Alpha
286 """
287
288 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
289 """The unique identifier for the case."""
290
291 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
292 """The timestamp when the case was created."""
293
294 updated_at: datetime = Field(default=datetime.now(UTC))
295 """The timestamp when the case was last updated."""
296
297 name: str = Field(..., min_length=1, max_length=512)
298 """The name of the case."""
299
300 description: str = Field(..., min_length=1)
301 """A description of the case."""
302
303 documentation: str | None = None
304 """Optional documentation or notes for the case."""
305
306 pap: TlpPapLevel = TlpPapLevel.WHITE
307 """The PAP (Permissible Actions Protocol) level for the case."""
308
309 parent_case: Optional["Case"] | Optional[ObjectReference] = None
310 """Reference to a parent case, if this case is a sub-case."""
311
312 tlp: TlpPapLevel = TlpPapLevel.WHITE
313 """The TLP (Traffic Light Protocol) level for the case."""
314
315 colander_internal_type: Literal["case"] = "case"
316 """Internal type discriminator for (de)serialization."""
317
318
[docs]
319class Entity(ColanderType, abc.ABC):
320 """Entity is an abstract base class representing a core object in the model.
321
322 This class provides common fields for all entities, including identifiers, timestamps, descriptive fields,
323 and references to cases. Examples include actors, artifacts, devices, etc.
324 """
325
326 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
327 """The unique identifier for the entity."""
328
329 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
330 """The timestamp when the entity was created."""
331
332 updated_at: datetime = Field(default=datetime.now(UTC))
333 """The timestamp when the entity was last updated."""
334
335 name: str = Field(..., min_length=1, max_length=512)
336 """The name of the entity."""
337
338 case: Optional[Case] | Optional[ObjectReference] = None
339 """Reference to the case this entity belongs to."""
340
341 description: str | None = None
342 """A description of the entity."""
343
344 pap: TlpPapLevel = TlpPapLevel.WHITE
345 """The PAP (Permissible Actions Protocol) level for the entity."""
346
347 source_url: str | AnyUrl | None = None
348 """Optional source URL for the entity."""
349
350 tlp: TlpPapLevel = TlpPapLevel.WHITE
351 """The TLP (Traffic Light Protocol) level for the entity."""
352
[docs]
353 def get_type(self) -> Optional[EntityType_T]:
354 """
355 Returns the type definition for this entity instance.
356
357 This method returns the type definition object (e.g., ObservableType, ActorType, DeviceType).
358
359 Returns:
360 Optional[_EntityType]: The type definition object for this entity. The specific type depends
361 on the entity subclass (e.g., Observable returns ObservableType, Actor returns ActorType, etc.).
362 """
363 if hasattr(self, "type"):
364 return getattr(self, "type")
365 return None
366
[docs]
367 def get_immutable_relations(
368 self, mapping: Optional[Dict[str, str]] = None, default_name: Optional[str] = None
369 ) -> Dict[str, "EntityRelation"]:
370 """
371 Returns a dictionary of immutable relations derived from the entity's reference fields.
372
373 This method automatically creates EntityRelation objects by inspecting the entity's fields
374 and identifying those annotated as ObjectReference or List[ObjectReference]. These represent
375 the entity's connections to other entities in the knowledge graph, forming the basis for
376 graph traversal and relationship analysis.
377
378 Immutable relations are derived from the entity's structure and cannot be modified directly.
379 They represent inherent relationships defined by the entity's reference fields, such as
380 'extracted_from', 'operated_by', 'associated_threat', etc.
381
382 Args:
383 mapping (Dict[str, str], optional): A dictionary to customize relation names. Keys should
384 be field names, and values should be the desired relation names. If not provided,
385 field names are converted to human-readable format by replacing underscores with spaces.
386 Defaults to None.
387 default_name (str): If a mapping is provided but no field mapping was found, the relation
388 will be named 'default_new_name'.
389
390 Returns:
391 Dict[str, "EntityRelation"]: A dictionary of EntityRelation objects keyed by their string
392 representation of relation IDs. Each relation represents a connection from this entity
393 to another entity referenced in its fields.
394
395 Note:
396 - The 'case' field is explicitly excluded from relation generation as it represents
397 a grouping mechanism rather than a semantic relationship.
398 - Only fields with actual values (not None or empty) are processed.
399 - Each EntityRelation created has this entity as the source (obj_from) and the
400 referenced entity as the target (obj_to).
401 """
402 name_mapping = mapping or {}
403 relations: Dict[str, "EntityRelation"] = {}
404 for field_name, field_info in self.__class__.model_fields.items():
405 if field_name == "case":
406 continue
407 field_annotation = get_args(field_info.annotation)
408 field_value = getattr(self, field_name, None)
409
410 if not field_value or not field_annotation:
411 continue
412
413 # Handle single ObjectReference
414 if ObjectReference in field_annotation:
415 relation_name = name_mapping.get(field_name, default_name or field_name)
416 relation = EntityRelation(
417 name=relation_name,
418 obj_from=self,
419 obj_to=field_value,
420 )
421 relations[str(relation.id)] = relation
422
423 # Handle List[ObjectReference]
424 elif List[ObjectReference] in field_annotation:
425 for object_reference in field_value:
426 relation_name = name_mapping.get(field_name, default_name or field_name)
427 relation = EntityRelation(
428 name=relation_name,
429 obj_from=self,
430 obj_to=object_reference,
431 )
432 relations[str(relation.id)] = relation
433
434 return relations
435
436
[docs]
437class EntityRelation(ColanderType):
438 """EntityRelation represents a relationship between two entities in the model.
439
440 This class is used to define and manage relationships between objects, such as associations
441 between observables, devices, or actors.
442
443 Example:
444 >>> obs1 = Observable(
445 ... id=uuid4(),
446 ... name='1.1.1.1',
447 ... type=ObservableTypes.IPV4.value
448 ... )
449 >>> obs2 = Observable(
450 ... id=uuid4(),
451 ... name='8.8.8.8',
452 ... type=ObservableTypes.IPV4.value
453 ... )
454 >>> relation = EntityRelation(
455 ... id=uuid4(),
456 ... name='connection',
457 ... obj_from=obs1,
458 ... obj_to=obs2
459 ... )
460 >>> print(relation.name)
461 connection
462 """
463
464 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
465 """The unique identifier for the entity relation."""
466
467 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
468 """The timestamp when the entity relation was created."""
469
470 updated_at: datetime = Field(default=datetime.now(UTC))
471 """The timestamp when the entity relation was last updated."""
472
473 name: str = Field(..., min_length=1, max_length=512)
474 """The name of the entity relation."""
475
476 case: Optional[Case] | Optional[ObjectReference] = None
477 """Reference to the case this relation belongs to."""
478
479 attributes: Optional[Dict[str, str]] = None
480 """Dictionary of additional attributes for the relation."""
481
482 obj_from: EntityTypes | ObjectReference = Field(...)
483 """The source entity or reference in the relation."""
484
485 obj_to: EntityTypes | ObjectReference = Field(...)
486 """The target entity or reference in the relation."""
487
488
[docs]
489class Actor(Entity):
490 """
491 Actor represents an individual or group involved in an event, activity, or system.
492
493 This class extends the Entity base class and includes additional fields specific to actors.
494
495 Example:
496 >>> actor_type = ActorTypes.INDIVIDUAL.value
497 >>> actor = Actor(
498 ... name='John Doe',
499 ... type=actor_type
500 ... )
501 >>> print(actor.name)
502 John Doe
503 """
504
505 type: ActorType
506 """The type definition for the actor."""
507
508 colander_internal_type: Literal["actor"] = "actor"
509 """Internal type discriminator for (de)serialization."""
510
511 attributes: Optional[Dict[str, str]] = None
512 """Dictionary of additional attributes for the device."""
513
514
[docs]
515class Device(Entity):
516 """
517 Device represents a physical or virtual device in Colander.
518
519 This class extends the Entity base class and includes additional fields specific to devices,
520 such as their type, attributes, and the actor operating the device.
521
522 Example:
523 >>> device_type = DeviceTypes.MOBILE.value
524 >>> actor = Actor(name='John Doe', type=ActorTypes.INDIVIDUAL.value)
525 >>> device = Device(
526 ... name="John's Phone",
527 ... type=device_type,
528 ... operated_by=actor,
529 ... attributes={'os': 'Android', 'version': '12'}
530 ... )
531 >>> print(device.name)
532 John's Phone
533 """
534
535 type: DeviceType
536 """The type definition for the device."""
537
538 attributes: Optional[Dict[str, str]] = None
539 """Dictionary of additional attributes for the device."""
540
541 operated_by: Optional[Actor] | Optional[ObjectReference] = None
542 """Reference to the actor operating the device."""
543
544 colander_internal_type: Literal["device"] = "device"
545 """Internal type discriminator for (de)serialization."""
546
547
[docs]
548class Artifact(Entity):
549 """
550 Artifact represents a file or data object, such as a document, image, or binary, within the system.
551
552 This class extends the Entity base class and includes additional fields specific to artifacts,
553 such as type, attributes, extraction source, file metadata, and cryptographic hashes.
554
555 Example:
556 >>> artifact_type = ArtifactTypes.DOCUMENT.value
557 >>> device_type = DeviceTypes.LAPTOP.value
558 >>> device = Device(name='Analyst Laptop', type=device_type)
559 >>> artifact = Artifact(
560 ... name='malware_sample.pdf',
561 ... type=artifact_type,
562 ... extracted_from=device,
563 ... extension='pdf',
564 ... original_name='invoice.pdf',
565 ... mime_type='application/pdf',
566 ... md5='d41d8cd98f00b204e9800998ecf8427e',
567 ... sha1='da39a3ee5e6b4b0d3255bfef95601890afd80709',
568 ... sha256='e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
569 ... size_in_bytes=12345
570 ... )
571 >>> print(artifact.name)
572 malware_sample.pdf
573 """
574
575 type: ArtifactType
576 """The type definition for the artifact."""
577
578 attributes: Optional[Dict[str, str]] = None
579 """Dictionary of additional attributes for the artifact."""
580
581 extracted_from: Optional[Device] | Optional[ObjectReference] = None
582 """Reference to the device from which this artifact was extracted."""
583
584 extension: str | None = None
585 """The file extension of the artifact, if applicable."""
586
587 original_name: str | None = None
588 """The original name of the artifact before ingestion."""
589
590 mime_type: str | None = None
591 """The MIME type of the artifact."""
592
593 detached_signature: str | None = None
594 """Optional detached signature for the artifact."""
595
596 md5: str | None = None
597 """MD5 hash of the artifact."""
598
599 sha1: str | None = None
600 """SHA1 hash of the artifact."""
601
602 sha256: str | None = None
603 """SHA256 hash of the artifact."""
604
605 size_in_bytes: NonNegativeInt = 0
606 """The size of the artifact in bytes."""
607
608 colander_internal_type: Literal["artifact"] = "artifact"
609 """Internal type discriminator for (de)serialization."""
610
611
[docs]
612class DataFragment(Entity):
613 """
614 DataFragment represents a fragment of data, such as a code snippet, text, or other content.
615
616 This class extends the Entity base class and includes additional fields specific to data fragments,
617 such as their type, content, and the artifact from which they were extracted.
618
619 Example:
620 >>> data_fragment_type = DataFragmentTypes.CODE.value
621 >>> artifact = Artifact(
622 ... name='example_artifact',
623 ... type=ArtifactTypes.DOCUMENT.value
624 ... )
625 >>> data_fragment = DataFragment(
626 ... name='Sample Code',
627 ... type=data_fragment_type,
628 ... content='print("Hello, World!")',
629 ... extracted_from=artifact
630 ... )
631 >>> print(data_fragment.content)
632 print("Hello, World!")
633 """
634
635 type: DataFragmentType
636 """The type definition for the data fragment."""
637
638 content: str
639 """The content of the data fragment."""
640
641 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None
642 """Reference to the artifact from which this data fragment was extracted."""
643
644 colander_internal_type: Literal["datafragment"] = "datafragment"
645 """Internal type discriminator for (de)serialization."""
646
647
[docs]
648class Threat(Entity):
649 """
650 Threat represents a threat entity, such as a malware family, campaign, or adversary.
651
652 This class extends the Entity base class and includes a type field for threat classification.
653
654 Example:
655 >>> threat_type = ThreatTypes.TROJAN.value
656 >>> threat = Threat(
657 ... name='Emotet',
658 ... type=threat_type
659 ... )
660 >>> print(threat.name)
661 Emotet
662 """
663
664 type: ThreatType
665 """The type definition for the threat."""
666
667 colander_internal_type: Literal["threat"] = "threat"
668 """Internal type discriminator for (de)serialization."""
669
670
[docs]
671class Observable(Entity):
672 """
673 Observable represents an entity that can be observed or detected within the system.
674
675 This class extends the Entity base class and includes additional fields specific to observables,
676 such as classification, raw value, extraction source, associated threat, and operator.
677
678 Example:
679 >>> ot = ObservableTypes.IPV4.value
680 >>> obs = Observable(
681 ... name='1.2.3.4',
682 ... type=ot,
683 ... classification='malicious',
684 ... raw_value='1.2.3.4',
685 ... attributes={'asn': 'AS123'}
686 ... )
687 >>> print(obs.name)
688 1.2.3.4
689 """
690
691 type: ObservableType = Field(...)
692 """The type definition for the observable."""
693
694 attributes: Optional[Dict[str, str]] = None
695 """Dictionary of additional attributes for the observable."""
696
697 classification: str | None = Field(default=None, max_length=512)
698 """Optional classification label for the observable."""
699
700 raw_value: str | None = None
701 """The raw value associated with the observable."""
702
703 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None
704 """Reference to the artifact from which this observable was extracted."""
705
706 associated_threat: Optional[Threat] | Optional[ObjectReference] = None
707 """Reference to an associated threat."""
708
709 operated_by: Optional[Actor] | Optional[ObjectReference] = None
710 """Reference to the actor operating this observable."""
711
712 colander_internal_type: Literal["observable"] = "observable"
713 """Internal type discriminator for (de)serialization."""
714
715
[docs]
716class DetectionRule(Entity):
717 """
718 DetectionRule represents a rule used for detecting specific content or logic related to observables or
719 object references.
720
721 This class is designed to encapsulate detection rules that can be applied across various systems or platforms to
722 identify patterns or conditions defined by the user.
723
724 Example:
725 >>> drt = DetectionRuleTypes.YARA.value
726 >>> rule = DetectionRule(
727 ... name='Detect Malicious IP',
728 ... type=drt,
729 ... content='rule malicious_ip { condition: true }',
730 ... )
731 >>> print(rule.name)
732 Detect Malicious IP
733 """
734
735 type: DetectionRuleType
736 """The type definition for the detection rule."""
737
738 content: str
739 """The content or logic of the detection rule."""
740
741 targeted_observables: Optional[List[Observable]] | Optional[List[ObjectReference]] = None
742 """List of observables or references targeted by this detection rule."""
743
744 colander_internal_type: Literal["detectionrule"] = "detectionrule"
745 """Internal type discriminator for (de)serialization."""
746
747
[docs]
748class Event(Entity):
749 """
750 Event represents an occurrence or activity observed within a system, such as a detection, alert, or log entry.
751
752 This class extends the Entity base class and includes additional fields specific to events,
753 such as timestamps, count, involved observables, and references to related entities.
754
755 Example:
756 >>> et = EventTypes.HIT.value
757 >>> obs_type = ObservableTypes.IPV4.value
758 >>> obs = Observable(
759 ... id=uuid4(),
760 ... name='8.8.8.8',
761 ... type=obs_type
762 ... )
763 >>> event = Event(
764 ... name='Suspicious Connection',
765 ... type=et,
766 ... first_seen=datetime(2024, 6, 1, 12, 0, tzinfo=UTC),
767 ... last_seen=datetime(2024, 6, 1, 12, 5, tzinfo=UTC),
768 ... involved_observables=[obs]
769 ... )
770 >>> print(event.name)
771 Suspicious Connection
772 """
773
774 type: EventType
775 """The type definition for the event."""
776
777 attributes: Optional[Dict[str, str]] = None
778 """Dictionary of additional attributes for the event."""
779
780 first_seen: datetime = datetime.now(UTC)
781 """The timestamp when the event was first observed."""
782
783 last_seen: datetime = datetime.now(UTC)
784 """The timestamp when the event was last observed."""
785
786 count: PositiveInt = 1
787 """The number of times this event was observed."""
788
789 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None
790 """Reference to the artifact from which this event was extracted."""
791
792 observed_on: Optional[Device] | Optional[ObjectReference] = None
793 """Reference to the device on which this event was observed."""
794
795 detected_by: Optional[DetectionRule] | Optional[ObjectReference] = None
796 """Reference to the detection rule that detected this event."""
797
798 # ToDo: missing attribute in Colander implementation
799 attributed_to: Optional[Actor] | Optional[ObjectReference] = None
800 """Reference to the actor attributed to this event."""
801
802 # ToDo: missing attribute in Colander implementation
803 target: Optional[Actor] | Optional[ObjectReference] = None
804 """Reference to the actor targeted during this event."""
805
806 involved_observables: List[Observable] | List[ObjectReference] = []
807 """List of observables or references involved in this event."""
808
809 colander_internal_type: Literal["event"] = "event"
810 """Internal type discriminator for (de)serialization."""
811
812 @model_validator(mode="after")
813 def _check_dates(self) -> Any:
814 if self.first_seen > self.last_seen:
815 raise ValueError("first_seen must be before last_seen")
816 return self
817
818
[docs]
819class ColanderRepository(object, metaclass=Singleton):
820 """Singleton repository for managing and storing Case, Entity, and EntityRelation objects.
821
822 This class provides centralized storage and reference management for all model instances,
823 supporting insertion, lookup, and reference resolution/unlinking.
824 """
825
826 cases: Dict[str, Case]
827 entities: Dict[str, EntityTypes]
828 relations: Dict[str, EntityRelation]
829
[docs]
830 def __init__(self):
831 """Initializes the repository with empty dictionaries for cases, entities, and relations."""
832 self.cases = {}
833 self.entities = {}
834 self.relations = {}
835
[docs]
836 def clear(self):
837 self.cases.clear()
838 self.entities.clear()
839 self.relations.clear()
840
[docs]
841 def __lshift__(self, other: EntityTypes | Case) -> None:
842 """Inserts an object into the appropriate repository dictionary.
843
844 Args:
845 other: The object (Entity, EntityRelation, or Case) to insert.
846 """
847 if isinstance(other, Entity):
848 self.entities[str(other.id)] = other
849 elif isinstance(other, EntityRelation):
850 self.relations[str(other.id)] = other
851 elif isinstance(other, Case):
852 self.cases[str(other.id)] = other
853
[docs]
854 def __rshift__(self, other: str | UUID4) -> EntityTypes | EntityRelation | Case | str | UUID4:
855 """Retrieves an object by its identifier from entities, relations, or cases.
856
857 Args:
858 other: The string or UUID identifier to look up.
859
860 Returns:
861 The found object or the identifier if not found.
862 """
863 _other = str(other)
864 if _other in self.entities:
865 return self.entities[_other]
866 elif _other in self.relations:
867 return self.relations[_other]
868 elif _other in self.cases:
869 return self.cases[_other]
870 return other
871
[docs]
872 def unlink_references(self):
873 """Unlinks all object references in entities, relations, and cases by replacing them with UUIDs."""
874 for _, entity in self.entities.items():
875 entity.unlink_references()
876 for _, relation in self.relations.items():
877 relation.unlink_references()
878 for _, case in self.cases.items():
879 case.unlink_references()
880
[docs]
881 def resolve_references(self):
882 """Resolves all UUID references in entities, relations, and cases to their corresponding objects."""
883 for _, entity in self.entities.items():
884 entity.resolve_references()
885 for _, relation in self.relations.items():
886 relation.resolve_references()
887 for _, case in self.cases.items():
888 case.resolve_references()
889
890
[docs]
891class ColanderFeed(ColanderType):
892 """ColanderFeed aggregates entities, relations, and cases for bulk operations or data exchange.
893
894 This class is used to load, manage, and resolve references for collections of model objects.
895
896 Example:
897 >>> feed_data = {
898 ... "entities": {
899 ... "204d4590-a3ee-4f24-8eaf-350ec2fa751b": {
900 ... "id": "204d4590-a3ee-4f24-8eaf-350ec2fa751b",
901 ... "name": "Example Observable",
902 ... "type": {"name": "IPv4", "short_name": "IPV4"},
903 ... "super_type": {"short_name": "observable"},
904 ... "colander_internal_type": "observable"
905 ... }
906 ... },
907 ... "relations": {},
908 ... "cases": {}
909 ... }
910 >>> feed = ColanderFeed.load(feed_data)
911 >>> print(list(feed.entities.keys()))
912 ['204d4590-a3ee-4f24-8eaf-350ec2fa751b']
913 """
914
915 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
916 """The unique identifier for the feed."""
917
918 name: str = ""
919 """Optional name of the feed."""
920
921 description: str = ""
922 """Optional description of the feed."""
923
924 entities: Optional[Dict[str, EntityTypes]] = {}
925 """Dictionary of entity objects, keyed by their IDs."""
926
927 relations: Optional[Dict[str, EntityRelation]] = {}
928 """Dictionary of entity relations, keyed by their IDs."""
929
930 cases: Optional[Dict[str, Case]] = {}
931 """Dictionary of case objects, keyed by their IDs."""
932
[docs]
933 @staticmethod
934 def load(raw_object: dict | list) -> "ColanderFeed":
935 """Loads an EntityFeed from a raw object, which can be either a dictionary or a list.
936
937 Args:
938 raw_object: The raw data representing the entities and relations to be loaded into
939 the EntityFeed.
940
941 Returns:
942 The EntityFeed loaded from a raw object.
943
944 Raises:
945 ValueError: If there are inconsistencies in entity IDs or relations.
946 """
947 if "entities" in raw_object:
948 for entity_id, entity in raw_object["entities"].items():
949 if entity_id != entity.get("id"):
950 raise ValueError(f"Relation {entity_id} does not match with the ID of {entity}")
951 entity["colander_internal_type"] = entity["super_type"]["short_name"].lower()
952 if "relations" in raw_object:
953 for relation_id, relation in raw_object["relations"].items():
954 if relation_id != relation.get("id"):
955 raise ValueError(f"Relation {relation_id} does not match with the ID of {relation}")
956 if (
957 "obj_from" not in relation
958 and "obj_to" not in relation
959 and "obj_from_id" in relation
960 and "obj_to_id" in relation
961 ):
962 relation["obj_from"] = relation["obj_from_id"]
963 relation["obj_to"] = relation["obj_to_id"]
964 entity_feed = ColanderFeed.model_validate(raw_object)
965 entity_feed.resolve_references()
966 for _, entity in entity_feed.entities.items():
967 entity.resolve_references()
968 for _, relation in entity_feed.relations.items():
969 relation.resolve_references()
970 for _, case in entity_feed.cases.items():
971 case.resolve_references()
972 return entity_feed
973
[docs]
974 def resolve_references(self, strict=False):
975 """Resolves references within entities, relations, and cases.
976
977 Iterates over each entity, relation, and case within the respective collections, calling their
978 `resolve_references` method to update them with any referenced data. This helps in synchronizing
979 internal state with external dependencies or updates.
980
981 Args:
982 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
983 If False, unresolved references remain as UUIDs.
984 """
985 for _, entity in self.entities.items():
986 entity.resolve_references(strict=strict)
987 for _, relation in self.relations.items():
988 relation.resolve_references(strict=strict)
989 for _, case in self.cases.items():
990 case.resolve_references(strict=strict)
991
[docs]
992 def unlink_references(self) -> None:
993 """Unlinks references from all entities, relations, and cases within the current context.
994
995 This method iterates through each entity, relation, and case stored in the `entities`, `relations`,
996 and `cases` dictionaries respectively, invoking their `unlink_references()` methods to clear any references
997 held by these objects. This operation is useful for breaking dependencies or preparing data for deletion
998 or modification.
999 """
1000 for _, entity in self.entities.items(): # type: ignore[union-attr]
1001 entity.unlink_references()
1002 for _, relation in self.relations.items(): # type: ignore[union-attr]
1003 relation.unlink_references()
1004 for _, case in self.cases.items(): # type: ignore[union-attr]
1005 case.unlink_references()
1006
[docs]
1007 def contains(self, obj: Any) -> bool:
1008 """Check if an object exists in the current feed by its identifier.
1009
1010 This method determines whether a given object (or its identifier) exists
1011 within any of the feed's collections: entities, relations, or cases.
1012 It extracts the object's ID and searches across all three collections.
1013
1014 Args:
1015 obj (Any): The object to check for existence. Can be:
1016 - An entity, relation, or case object with an 'id' attribute
1017 - A string or UUID representing an object ID
1018 - Any object that can be processed by get_id()
1019
1020 Returns:
1021 bool: True if the object exists in entities, relations, or cases;
1022 False otherwise
1023
1024 Example:
1025 >>> feed = ColanderFeed()
1026 >>> obs = Observable(name="test", type=ObservableTypes.IPV4.value)
1027 >>> feed.entities[str(obs.id)] = obs
1028 >>> feed.contains(obs)
1029 True
1030 >>> feed.contains("nonexistent-id")
1031 False
1032 """
1033 object_id = str(get_id(obj))
1034 if not object_id:
1035 return False
1036
1037 if object_id in self.entities:
1038 return True
1039 if object_id in self.relations:
1040 return True
1041 if object_id in self.cases:
1042 return True
1043
1044 return False
1045
[docs]
1046 def get(self, obj: Any) -> Optional[Union[Case, EntityTypes, EntityRelation]]:
1047 """Retrieve an object from the feed by its identifier.
1048
1049 This method searches for an object across all feed collections (entities, relations, cases)
1050 using the object's ID. It first checks if the object exists using the contains() method,
1051 then attempts to retrieve it from the appropriate collection.
1052
1053 Args:
1054 obj (Any): The object to retrieve. Can be:
1055 - An entity, relation, or case object with an 'id' attribute
1056 - A string or UUID representing an object ID
1057 - Any object that can be processed by get_id()
1058
1059 Returns:
1060 Optional[Union[Case, EntityTypes, EntityRelation]]: The found object if it exists
1061 in any of the collections (entities, relations, or cases), otherwise None.
1062 """
1063 if not self.contains(obj):
1064 return None
1065
1066 object_id = str(get_id(obj))
1067
1068 if object_id in self.entities:
1069 return self.entities.get(object_id)
1070 if object_id in self.relations:
1071 return self.relations.get(object_id)
1072 if object_id in self.cases:
1073 return self.cases.get(object_id)
1074
1075 return None
1076
[docs]
1077 def get_by_super_type(self, super_type: "CommonEntitySuperType") -> List[EntityTypes]:
1078 entities = []
1079 for _, entity in self.entities.items():
1080 if isinstance(entity, super_type.model_class):
1081 entities.append(entity)
1082 return entities
1083
[docs]
1084 def get_incoming_relations(self, entity: EntityTypes) -> Dict[str, EntityRelation]:
1085 """Retrieve all relations where the specified entity is the target (obj_to).
1086
1087 This method finds all entity relations in the feed where the given entity
1088 is the destination or target of the relationship. Only fully resolved
1089 relations are considered to ensure data consistency.
1090
1091 Args:
1092 entity (EntityTypes): The entity to find incoming relations for. Must be an instance of Entity.
1093
1094 Returns:
1095 Dict[str, EntityRelation]: A dictionary mapping relation IDs to EntityRelation objects where the entity
1096 is the target (obj_to).
1097 """
1098 assert isinstance(entity, Entity)
1099 relations = {}
1100 for relation_id, relation in self.relations.items():
1101 if not relation.is_fully_resolved():
1102 continue
1103 if relation.obj_to == entity:
1104 relations[relation_id] = relation
1105 return relations
1106
[docs]
1107 def get_outgoing_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]:
1108 """Retrieve all relations where the specified entity is the source (obj_from).
1109
1110 This method finds all entity relations in the feed where the given entity
1111 is the source or origin of the relationship. Only fully resolved
1112 relations are considered to ensure data consistency.
1113
1114 Args:
1115 entity (EntityTypes): The entity to find outgoing relations for. Must be an instance of Entity.
1116 exclude_immutables (bool): If True, exclude immutable relations.
1117
1118 Returns:
1119 Dict[str, EntityRelation]: A dictionary mapping relation IDs to EntityRelation objects where the entity
1120 is the source (obj_from).
1121 """
1122 assert isinstance(entity, Entity)
1123 relations = {}
1124 if not exclude_immutables:
1125 for _, entity in self.entities.items():
1126 relations.update(entity.get_immutable_relations())
1127 for relation_id, relation in self.relations.items():
1128 if not relation.is_fully_resolved():
1129 continue
1130 if relation.obj_from == entity:
1131 relations[relation_id] = relation
1132 return relations
1133
[docs]
1134 def get_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]:
1135 """Retrieve all relations (both incoming and outgoing) for the specified entity.
1136
1137 This method combines the results of get_incoming_relations() and
1138 get_outgoing_relations() to provide a complete view of all relationships
1139 involving the specified entity, regardless of direction.
1140
1141 Args:
1142 entity (EntityTypes): The entity to find all relations for. Must be an instance of Entity.
1143 exclude_immutables (bool): If True, exclude immutable relations.
1144
1145 Returns:
1146 Dict[str, EntityRelation]: A dictionary mapping relation IDs to EntityRelation objects where the entity
1147 is either the source (obj_from) or target (obj_to).
1148 """
1149 assert isinstance(entity, Entity)
1150
1151 relations = {}
1152 relations.update(self.get_incoming_relations(entity))
1153 relations.update(self.get_outgoing_relations(entity, exclude_immutables=exclude_immutables))
1154
1155 return relations
1156
[docs]
1157 def filter(
1158 self,
1159 maximum_tlp_level: TlpPapLevel,
1160 include_relations=True,
1161 include_cases=True,
1162 exclude_entity_types: Optional[List[EntityTypes]] = None,
1163 ) -> "ColanderFeed":
1164 """Filter the feed based on TLP (Traffic Light Protocol) level and optionally include relations and cases.
1165
1166 This method creates a new ColanderFeed containing only entities whose TLP level is below
1167 the specified maximum threshold. It can optionally include relations between filtered
1168 entities and cases associated with the filtered entities.
1169
1170 Args:
1171 maximum_tlp_level (TlpPapLevel): The maximum TLP level threshold. Only entities
1172 with TLP levels strictly below this value will be included.
1173 include_relations (bool, optional): If True, includes relations where both
1174 source and target entities are present in the filtered feed. Defaults to True.
1175 include_cases (bool, optional): If True, includes cases associated with the
1176 filtered entities. Defaults to True.
1177 exclude_entity_types (Optional[List[EntityTypes]], optional): If provided, entities of these types
1178 are excluded.
1179
1180 Returns:
1181 ColanderFeed: A new filtered feed containing entities, relations, and cases that meet the
1182 specified criteria.
1183 """
1184 assert isinstance(maximum_tlp_level, TlpPapLevel)
1185
1186 excluded_types = exclude_entity_types or []
1187
1188 self.resolve_references()
1189 filtered = ColanderFeed(
1190 name=self.name,
1191 description=self.description,
1192 )
1193
1194 for entity_id, entity in self.entities.items():
1195 if entity.tlp.value < maximum_tlp_level.value and type(entity) not in excluded_types:
1196 filtered.entities[entity_id] = entity
1197
1198 for entity_id, entity in filtered.entities.items():
1199 # Only include relations of the entity
1200 if include_relations:
1201 for relation_id, relation in self.get_relations(entity).items():
1202 if filtered.contains(relation.obj_from) and filtered.contains(relation.obj_to):
1203 filtered.relations[relation_id] = relation
1204 # Only include the case associated with the entity
1205 if include_cases:
1206 if (case := self.get(entity.case)) is not None and case.tlp.value < maximum_tlp_level.value:
1207 filtered.cases[str(case.id)] = case
1208
1209 filtered.resolve_references()
1210 return filtered
1211
1212
[docs]
1213class CommonEntitySuperType(BaseModel):
1214 """
1215 CommonEntitySuperType defines metadata for a super type of entities in the Colander data model.
1216
1217 This class is used to represent high-level categories of entities (such as Actor, Artifact, Device, etc.)
1218 and provides fields for the short name, display name, associated types, and the Python class implementing the entity.
1219 """
1220
1221 short_name: str = Field(frozen=True, max_length=32)
1222 """A short name for the model type."""
1223
1224 name: str = Field(frozen=True, max_length=512)
1225 """The name of the model type."""
1226
1227 types: Optional[List[object]] = Field(default=None, exclude=True)
1228 """Optional reference to the enum or collection of supported types."""
1229
1230 model_class: Any = Field(default=None, exclude=True)
1231 """The Python class associated with this super type (Observable...)."""
1232
1233 type_class: Any = Field(default=None, exclude=True)
1234 """The Python class associated with the entity type (ObservableType...)."""
1235
1236 default_type: Any = Field(default=None, exclude=True)
1237 """The default entity type (GENERIC...)."""
1238
[docs]
1239 def type_by_short_name(self, short_name: str):
1240 for t in self.types:
1241 if hasattr(t, short_name.upper()):
1242 return getattr(t, short_name.upper()).value
1243 return self.default_type.value
1244
1245 def __str__(self):
1246 return self.short_name
1247
1248 def __repr__(self):
1249 return self.short_name
1250
1251
[docs]
1252class CommonEntitySuperTypes(enum.Enum):
1253 """
1254 CommonEntitySuperTypes is an enumeration of all super types for entities in the Colander data model.
1255
1256 Each member of this enum represents a high-level entity category (such as Actor, Artifact, Device, etc.)
1257 and holds a CommonEntitySuperType instance containing metadata and references to the corresponding
1258 entity class and its supported types.
1259
1260 This enum is used for type resolution and validation across the model.
1261
1262 Example:
1263 >>> super_type = CommonEntitySuperTypes.ACTOR.value
1264 >>> print(super_type.name)
1265 Actor
1266 """
1267
1268 ACTOR = CommonEntitySuperType(
1269 short_name="ACTOR",
1270 name="Actor",
1271 model_class=Actor,
1272 type_class=ActorType,
1273 default_type=ActorTypes.default,
1274 types=[t for t in ActorTypes],
1275 )
1276 ARTIFACT = CommonEntitySuperType(
1277 short_name="ARTIFACT",
1278 name="Artifact",
1279 model_class=Artifact,
1280 type_class=ArtifactType,
1281 default_type=ArtifactTypes.default,
1282 types=[t for t in ArtifactTypes],
1283 )
1284 DATA_FRAGMENT = CommonEntitySuperType(
1285 short_name="DATAFRAGMENT",
1286 name="Data fragment",
1287 model_class=DataFragment,
1288 type_class=DataFragmentType,
1289 default_type=DataFragmentTypes.default,
1290 types=[t for t in DataFragmentTypes],
1291 )
1292 DETECTION_RULE = CommonEntitySuperType(
1293 short_name="DETECTIONRULE",
1294 name="Detection rule",
1295 model_class=DetectionRule,
1296 type_class=DetectionRuleType,
1297 default_type=DetectionRuleTypes.default,
1298 types=[t for t in DetectionRuleTypes],
1299 )
1300 DEVICE = CommonEntitySuperType(
1301 short_name="DEVICE",
1302 name="Device",
1303 model_class=Device,
1304 type_class=DeviceType,
1305 default_type=DeviceTypes.default,
1306 types=[t for t in DeviceTypes],
1307 )
1308 EVENT = CommonEntitySuperType(
1309 short_name="EVENT",
1310 name="Event",
1311 model_class=Event,
1312 type_class=EventType,
1313 default_type=EventTypes.default,
1314 types=[t for t in EventTypes],
1315 )
1316 OBSERVABLE = CommonEntitySuperType(
1317 short_name="OBSERVABLE",
1318 name="Observable",
1319 model_class=Observable,
1320 type_class=ObservableType,
1321 default_type=ObservableTypes.default,
1322 types=[t for t in ObservableTypes],
1323 )
1324 THREAT = CommonEntitySuperType(
1325 short_name="THREAT",
1326 name="Threat",
1327 model_class=Threat,
1328 type_class=ThreatType,
1329 default_type=ThreatTypes.default,
1330 types=[t for t in ThreatTypes],
1331 )
1332
[docs]
1333 @classmethod
1334 def by_short_name(cls, short_name: str) -> Optional[CommonEntitySuperType]:
1335 sn = short_name.replace(" ", "_").upper()
1336 if sn in cls.__members__:
1337 return cls[sn].value
1338 return None