1import abc
2import enum
3from datetime import datetime, UTC
4from typing import List, Dict, Optional, Union, Annotated, Literal, get_args, Any
5from uuid import uuid4, UUID
6
7from pydantic import (
8 PositiveInt,
9 NonNegativeInt,
10 UUID4,
11 BaseModel,
12 AnyUrl,
13 computed_field,
14 model_validator,
15 ConfigDict,
16 Field,
17)
18
19from colander_data_converter.base.common import (
20 ObjectReference,
21 TlpPapLevel,
22 Singleton,
23 LRUDict,
24)
25from colander_data_converter.base.types.actor import ActorType, ActorTypes
26from colander_data_converter.base.types.artifact import ArtifactType, ArtifactTypes
27from colander_data_converter.base.types.base import EntityType_T
28from colander_data_converter.base.types.data_fragment import DataFragmentType, DataFragmentTypes
29from colander_data_converter.base.types.detection_rule import DetectionRuleType, DetectionRuleTypes
30from colander_data_converter.base.types.device import DeviceType, DeviceTypes
31from colander_data_converter.base.types.event import EventType, EventTypes
32from colander_data_converter.base.types.observable import ObservableType, ObservableTypes
33from colander_data_converter.base.types.threat import ThreatType, ThreatTypes
34
35resource_package = __name__
36
37
[docs]
38def get_id(obj: Any) -> Optional[UUID4]:
39 """
40 Extracts a UUID4 identifier from the given object.
41
42 Args:
43 obj: The object to extract the UUID from. Can be a string, UUID, or an object with an 'id' attribute.
44
45 Returns:
46 The extracted UUID4 if available, otherwise None.
47 """
48 if not obj:
49 return None
50
51 if isinstance(obj, str):
52 try:
53 return UUID(obj, version=4)
54 except (Exception,): # nosec
55 return None
56 elif isinstance(obj, UUID):
57 return obj
58 elif (obj_id := getattr(obj, "id", None)) is not None:
59 return get_id(obj_id)
60
61 return None
62
63
64# Annotated union type representing all possible entity definitions in the model.
65# This type is used for fields that can accept any of the defined entity classes.
66# The Field discriminator 'colander_internal_type' is used for type resolution during (de)serialization.
67EntityTypes = Annotated[
68 Union[
69 "Actor",
70 "Artifact",
71 "DataFragment",
72 "Observable",
73 "DetectionRule",
74 "Device",
75 "Event",
76 "Threat",
77 ],
78 Field(discriminator="colander_internal_type"),
79]
80
81
82# noinspection PyTypeChecker,PyBroadException
[docs]
83class ColanderType(BaseModel):
84 """Base class for all Colander model data_types, providing common functionality.
85
86 This class extends Pydantic's BaseModel and is intended to be subclassed by
87 all model entities. It includes methods for linking and unlinking object references,
88 resolving type hints, and extracting subclass information.
89 """
90
91 model_config: ConfigDict = ConfigDict(str_strip_whitespace=True, arbitrary_types_allowed=True, from_attributes=True)
92
[docs]
93 def model_post_init(self, __context):
94 """Executes post-initialization logic for the model, ensuring the repository
95 registers the current subclass instance.
96
97 Args:
98 __context (Any): Additional context provided for post-initialization handling.
99 """
100 _ = ColanderRepository()
101 _ << self
102
103 def _process_reference_fields(self, operation, strict=False):
104 """Helper method to process reference fields for both unlinking and resolving operations.
105
106 Args:
107 operation: The operation to perform, either 'unlink' or 'resolve'.
108 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
109 Only used for 'resolve' operation. Defaults to False.
110
111 Raises:
112 ValueError: If strict is True, and a UUID reference cannot be resolved.
113 AttributeError: If the class instance does not have the expected field or attribute.
114 """
115 for field, info in self.__class__.model_fields.items():
116 annotation_args = get_args(info.annotation)
117 if ObjectReference in annotation_args:
118 ref = getattr(self, field)
119 if operation == "unlink" and ref and type(ref) is not UUID:
120 setattr(self, field, ref.id)
121 elif operation == "resolve" and type(ref) is UUID:
122 x = ColanderRepository() >> ref
123 if strict and isinstance(x, UUID):
124 raise ValueError(f"Unable to resolve UUID reference {x}")
125 setattr(self, field, x)
126 elif List[ObjectReference] in annotation_args:
127 refs = getattr(self, field)
128 new_refs = []
129 _update = False
130 for ref in refs:
131 if operation == "unlink" and ref and type(ref) is not UUID:
132 new_refs.append(ref.id)
133 _update = True
134 elif operation == "resolve" and type(ref) is UUID:
135 x = ColanderRepository() >> ref
136 if strict and isinstance(x, UUID):
137 raise ValueError(f"Unable to resolve UUID reference {x}")
138 new_refs.append(x)
139 _update = True
140 if _update:
141 setattr(self, field, new_refs)
142
[docs]
143 def unlink_references(self):
144 """Unlinks object references by replacing them with their respective UUIDs.
145
146 This method updates the model fields of the class instance where
147 fields annotated as `ObjectReference` or `List[ObjectReference]` exist. It replaces the
148 references (of type objects) with their UUIDs if they exist.
149
150 For fields of type `ObjectReference`, the method retrieves the field's value and replaces
151 it with its `id` (UUID) if the current value is not already a UUID.
152
153 For fields of type `List[ObjectReference]`, the method iterates through the list and
154 replaces each object reference with its `id` (UUID) if the current value is
155 not already a UUID. The field value is updated only if at least one
156 replacement occurs.
157
158 Raises:
159 AttributeError: If the class instance does not have the expected field or attribute.
160 """
161 self._process_reference_fields("unlink")
162
[docs]
163 def resolve_references(self, strict=False):
164 """Resolves references for the fields in the object's model.
165
166 Fields annotated with `ObjectReference` or `List[ObjectReference]` are processed
167 to fetch and replace their UUID references with respective entities using the `Repository`.
168
169 This method updates the object in-place.
170
171 Args:
172 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
173 If False, unresolved references remain as UUIDs.
174
175 Raises:
176 ValueError: If strict is True and a UUID reference cannot be resolved.
177 """
178 self._process_reference_fields("resolve", strict)
179
[docs]
180 def is_fully_resolved(self) -> bool:
181 """
182 Checks whether all object references in the model are fully resolved.
183
184 This method verifies that all fields annotated as `ObjectReference` or `List[ObjectReference]`
185 do not contain unresolved UUIDs, indicating that references have been replaced with actual objects.
186
187 Returns:
188 bool: True if all references are resolved to objects, False if any remain as UUIDs.
189 """
190 self.resolve_references()
191
192 for field, info in self.__class__.model_fields.items():
193 annotation_args = get_args(info.annotation)
194 if ObjectReference in annotation_args:
195 ref = getattr(self, field)
196 if isinstance(ref, UUID):
197 return False
198 elif List[ObjectReference] in annotation_args:
199 refs = getattr(self, field)
200 for ref in refs:
201 if isinstance(ref, UUID):
202 return False
203 return True
204
[docs]
205 def has_property(self, property_name: str) -> bool:
206 """
207 Checks if the model has a field with the given property name.
208
209 Args:
210 property_name: The name of the property to check.
211
212 Returns:
213 True if the property exists in the model fields, False otherwise.
214 """
215 return property_name in self.__class__.model_fields
216
[docs]
217 def define_arbitrary_property(self, property_name: str, value: Any):
218 """
219 Defines an arbitrary property on the model instance if it does not already exist.
220
221 Args:
222 property_name: The name of the property to define.
223 value: The value to assign to the property.
224 """
225 if not self.has_property(property_name):
226 setattr(self, property_name, value)
227
[docs]
228 @classmethod
229 def subclasses(cls) -> Dict[str, type["EntityTypes"]]:
230 """Generates a dictionary containing all subclasses of the current class.
231
232 This method collects all the direct subclasses of the current class and maps their
233 names (converted to lowercase) to the class itself. It is primarily useful for
234 organizing and accessing class hierarchies dynamically.
235
236 Returns:
237 A dictionary where the keys are the lowercase names of the subclasses, and
238 the values are the subclass data_types themselves.
239 """
240 subclasses = {}
241 for subclass in cls.__subclasses__():
242 subclasses[subclass.__name__.lower()] = subclass
243 return subclasses
244
[docs]
245 @classmethod
246 def resolve_type(cls, content_type: str) -> type["EntityTypes"]:
247 """Resolves a specific type of entity definition based on the provided content type by
248 matching it against the available subclasses of the class. This utility ensures that
249 the given content type is valid and matches one of the registered subclasses.
250
251 Args:
252 content_type: A string representing the type of content to be resolved.
253 Must match the name of a subclass (in lowercase) of the current class.
254
255 Returns:
256 The resolved class type corresponding to the provided content type.
257 """
258 _content_type = content_type.lower()
259 _subclasses = cls.subclasses()
260 assert _content_type in _subclasses
261 return _subclasses[_content_type]
262
[docs]
263 @classmethod
264 def extract_type_hints(cls, obj: dict) -> str:
265 """Extracts type hints from a given dictionary based on specific keys.
266
267 This class method attempts to retrieve type hints from a dictionary using a specific
268 key ("colander_internal_type") or nested keys ("super_type" and its "short_name" value).
269 If the dictionary does not match the expected structure or the keys are not available,
270 a ValueError is raised.
271
272 Args:
273 obj: The dictionary from which type hints need to be extracted.
274
275 Returns:
276 A string representing the extracted type hint.
277
278 Raises:
279 ValueError: If the type hint cannot be extracted from the provided dictionary.
280 """
281 try:
282 if "colander_internal_type" in obj:
283 return obj.get("colander_internal_type", "")
284 elif "super_type" in obj:
285 return obj.get("super_type").get("short_name").lower().replace("_", "") # type: ignore[union-attr]
286 except (Exception,): # nosec
287 pass
288 raise ValueError("Unable to extract type hints.")
289
290 @computed_field
291 def super_type(self) -> "CommonEntitySuperType":
292 return self.get_super_type()
293
[docs]
294 def get_super_type(self) -> "CommonEntitySuperType":
295 return CommonEntitySuperType(
296 **{
297 "name": self.__class__.__name__,
298 "short_name": self.__class__.__name__.upper(),
299 "_class": self.__class__,
300 }
301 )
302
303
[docs]
304class Case(ColanderType):
305 """Case represents a collection or grouping of related entities, artifacts, or events.
306
307 This class is used to organize and manage related data, such as incidents, investigations, or projects.
308
309 Example:
310 >>> case = Case(
311 ... name='Investigation Alpha',
312 ... description='Investigation of suspicious activity'
313 ... )
314 >>> print(case.name)
315 Investigation Alpha
316 """
317
318 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
319 """The unique identifier for the case."""
320
321 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
322 """The timestamp when the case was created."""
323
324 updated_at: datetime = Field(default=datetime.now(UTC))
325 """The timestamp when the case was last updated."""
326
327 name: str = Field(..., min_length=1, max_length=512)
328 """The name of the case."""
329
330 description: str = Field(..., min_length=1)
331 """A description of the case."""
332
333 documentation: str | None = None
334 """Optional documentation or notes for the case."""
335
336 public_key: str | None = None
337 """Optional public key of the case."""
338
339 pap: TlpPapLevel = TlpPapLevel.WHITE
340 """The PAP (Permissible Actions Protocol) level for the case."""
341
342 parent_case: Optional["Case"] | Optional[ObjectReference] = None
343 """Reference to a parent case, if this case is a sub-case."""
344
345 tlp: TlpPapLevel = TlpPapLevel.WHITE
346 """The TLP (Traffic Light Protocol) level for the case."""
347
348 colander_internal_type: Literal["case"] = "case"
349 """Internal type discriminator for (de)serialization."""
350
351
[docs]
352class Entity(ColanderType, abc.ABC):
353 """Entity is an abstract base class representing a core object in the model.
354
355 This class provides common fields for all entities, including identifiers, timestamps, descriptive fields,
356 and references to cases. Examples include actors, artifacts, devices, etc.
357 """
358
359 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
360 """The unique identifier for the entity."""
361
362 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
363 """The timestamp when the entity was created."""
364
365 updated_at: datetime = Field(default=datetime.now(UTC))
366 """The timestamp when the entity was last updated."""
367
368 name: str = Field(..., min_length=1, max_length=512)
369 """The name of the entity."""
370
371 case: Optional[Case] | Optional[ObjectReference] = None
372 """Reference to the case this entity belongs to."""
373
374 description: str | None = None
375 """A description of the entity."""
376
377 pap: TlpPapLevel = TlpPapLevel.WHITE
378 """The PAP (Permissible Actions Protocol) level for the entity."""
379
380 source_url: str | AnyUrl | None = None
381 """Optional source URL for the entity."""
382
383 tlp: TlpPapLevel = TlpPapLevel.WHITE
384 """The TLP (Traffic Light Protocol) level for the entity."""
385
[docs]
386 def touch(self):
387 """Touch this entity's attributes."""
388 self.updated_at = datetime.now(UTC)
389
[docs]
390 def get_type(self) -> Optional[EntityType_T]:
391 """
392 Returns the type definition for this entity instance.
393
394 This method returns the type definition object (e.g., ObservableType, ActorType, DeviceType).
395
396 Returns:
397 The type definition object for this entity. The specific type depends
398 on the entity subclass (e.g., Observable returns ObservableType, Actor returns ActorType, etc.).
399 """
400 if hasattr(self, "type"):
401 return getattr(self, "type")
402 return None
403
[docs]
404 def get_immutable_relations(
405 self, mapping: Optional[Dict[str, str]] = None, default_name: Optional[str] = None
406 ) -> Dict[str, "EntityRelation"]:
407 """
408 Returns a dictionary of immutable relations derived from the entity's reference fields.
409
410 This method automatically creates EntityRelation objects by inspecting the entity's fields
411 and identifying those annotated as ObjectReference or List[ObjectReference]. These represent
412 the entity's connections to other entities in the knowledge graph, forming the basis for
413 graph traversal and relationship analysis.
414
415 Immutable relations are derived from the entity's structure and cannot be modified directly.
416 They represent inherent relationships defined by the entity's reference fields, such as
417 'extracted_from', 'operated_by', 'associated_threat', etc.
418
419 Args:
420 mapping: A dictionary to customize relation names. Keys should
421 be field names, and values should be the desired relation names. If not provided,
422 field names are converted to human-readable format by replacing underscores with spaces.
423 Defaults to None.
424 default_name: If a mapping is provided but no field mapping was found, the relation
425 will be named 'default_new_name'.
426
427 Returns:
428 A dictionary of EntityRelation objects keyed by their string
429 representation of relation IDs. Each relation represents a connection from this entity
430 to another entity referenced in its fields.
431
432 Note:
433 - The 'case' field is explicitly excluded from relation generation as it represents
434 a grouping mechanism rather than a semantic relationship.
435 - Only fields with actual values (not None or empty) are processed.
436 - Each EntityRelation created has this entity as the source (obj_from) and the
437 referenced entity as the target (obj_to).
438 """
439 name_mapping = mapping or {}
440 relations: Dict[str, "EntityRelation"] = {}
441 for field_name, field_info in self.__class__.model_fields.items():
442 if field_name == "case":
443 continue
444 field_annotation = get_args(field_info.annotation)
445 field_value = getattr(self, field_name, None)
446
447 if not field_value or not field_annotation:
448 continue
449
450 # Handle single ObjectReference
451 if ObjectReference in field_annotation:
452 relation_name = name_mapping.get(field_name, default_name or field_name)
453 relation = EntityRelation(
454 name=relation_name,
455 obj_from=self,
456 obj_to=field_value,
457 )
458 relations[str(relation.id)] = relation
459
460 # Handle List[ObjectReference]
461 elif List[ObjectReference] in field_annotation:
462 for object_reference in field_value:
463 relation_name = name_mapping.get(field_name, default_name or field_name)
464 relation = EntityRelation(
465 name=relation_name,
466 obj_from=self,
467 obj_to=object_reference,
468 )
469 relations[str(relation.id)] = relation
470
471 return relations
472
485
[docs]
486 def add_attributes(self, attributes: Dict[str, str]):
487 if not attributes or not hasattr(self, "attributes"):
488 return
489 entity_attributes = getattr(self, "attributes", {}) or {}
490 entity_attributes.update(attributes)
491 self.attributes = entity_attributes
492
493 def __hash__(self) -> int:
494 return hash(self.id)
495
496
[docs]
497class EntityRelation(ColanderType):
498 """EntityRelation represents a relationship between two entities in the model.
499
500 This class is used to define and manage relationships between objects, such as associations
501 between observables, devices, or actors.
502
503 Example:
504 >>> obs1 = Observable(
505 ... id=uuid4(),
506 ... name='1.1.1.1',
507 ... type=ObservableTypes.IPV4.value
508 ... )
509 >>> obs2 = Observable(
510 ... id=uuid4(),
511 ... name='8.8.8.8',
512 ... type=ObservableTypes.IPV4.value
513 ... )
514 >>> relation = EntityRelation(
515 ... id=uuid4(),
516 ... name='connection',
517 ... obj_from=obs1,
518 ... obj_to=obs2
519 ... )
520 >>> print(relation.name)
521 connection
522 """
523
524 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
525 """The unique identifier for the entity relation."""
526
527 created_at: datetime = Field(default=datetime.now(UTC), frozen=True)
528 """The timestamp when the entity relation was created."""
529
530 updated_at: datetime = Field(default=datetime.now(UTC))
531 """The timestamp when the entity relation was last updated."""
532
533 name: str = Field(..., min_length=1, max_length=512)
534 """The name of the entity relation."""
535
536 case: Optional[Case] | Optional[ObjectReference] = None
537 """Reference to the case this relation belongs to."""
538
539 attributes: Optional[Dict[str, str]] = None
540 """Dictionary of additional attributes for the relation."""
541
542 obj_from: EntityTypes | ObjectReference = Field(...)
543 """The source entity or reference in the relation."""
544
545 obj_to: EntityTypes | ObjectReference = Field(...)
546 """The target entity or reference in the relation."""
547
[docs]
548 def touch(self):
549 """Touch this relation's attributes."""
550 self.updated_at = datetime.now(UTC)
551
552
[docs]
553class Actor(Entity):
554 """
555 Actor represents an individual or group involved in an event, activity, or system.
556
557 This class extends the Entity base class and includes additional fields specific to actors.
558
559 Example:
560 >>> actor_type = ActorTypes.INDIVIDUAL.value
561 >>> actor = Actor(
562 ... name='John Doe',
563 ... type=actor_type
564 ... )
565 >>> print(actor.name)
566 John Doe
567 """
568
569 type: ActorType
570 """The type definition for the actor."""
571
572 colander_internal_type: Literal["actor"] = "actor"
573 """Internal type discriminator for (de)serialization."""
574
575 attributes: Optional[Dict[str, str]] = None
576 """Dictionary of additional attributes for the device."""
577
578
[docs]
579class Device(Entity):
580 """
581 Device represents a physical or virtual device in Colander.
582
583 This class extends the Entity base class and includes additional fields specific to devices,
584 such as their type, attributes, and the actor operating the device.
585
586 Example:
587 >>> device_type = DeviceTypes.MOBILE.value
588 >>> actor = Actor(name='John Doe', type=ActorTypes.INDIVIDUAL.value)
589 >>> device = Device(
590 ... name="John's Phone",
591 ... type=device_type,
592 ... operated_by=actor,
593 ... attributes={'os': 'Android', 'version': '12'}
594 ... )
595 >>> print(device.name)
596 John's Phone
597 """
598
599 type: DeviceType
600 """The type definition for the device."""
601
602 attributes: Optional[Dict[str, str]] = None
603 """Dictionary of additional attributes for the device."""
604
605 operated_by: Optional[Actor] | Optional[ObjectReference] = None
606 """Reference to the actor operating the device."""
607
608 colander_internal_type: Literal["device"] = "device"
609 """Internal type discriminator for (de)serialization."""
610
611
[docs]
612class Artifact(Entity):
613 """
614 Artifact represents a file or data object, such as a document, image, or binary, within the system.
615
616 This class extends the Entity base class and includes additional fields specific to artifacts,
617 such as type, attributes, extraction source, file metadata, and cryptographic hashes.
618
619 Example:
620 >>> artifact_type = ArtifactTypes.DOCUMENT.value
621 >>> device_type = DeviceTypes.LAPTOP.value
622 >>> device = Device(name='Analyst Laptop', type=device_type)
623 >>> artifact = Artifact(
624 ... name='malware_sample.pdf',
625 ... type=artifact_type,
626 ... extracted_from=device,
627 ... extension='pdf',
628 ... original_name='invoice.pdf',
629 ... mime_type='application/pdf',
630 ... md5='d41d8cd98f00b204e9800998ecf8427e',
631 ... sha1='da39a3ee5e6b4b0d3255bfef95601890afd80709',
632 ... sha256='e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
633 ... size_in_bytes=12345
634 ... )
635 >>> print(artifact.name)
636 malware_sample.pdf
637 """
638
639 type: ArtifactType
640 """The type definition for the artifact."""
641
642 attributes: Optional[Dict[str, str]] = None
643 """Dictionary of additional attributes for the artifact."""
644
645 extracted_from: Optional[Device] | Optional[ObjectReference] = None
646 """Reference to the device from which this artifact was extracted."""
647
648 extension: str | None = None
649 """The file extension of the artifact, if applicable."""
650
651 original_name: str | None = None
652 """The original name of the artifact before ingestion."""
653
654 mime_type: str | None = None
655 """The MIME type of the artifact."""
656
657 detached_signature: str | None = None
658 """Optional detached signature for the artifact."""
659
660 md5: str | None = None
661 """MD5 hash of the artifact."""
662
663 sha1: str | None = None
664 """SHA1 hash of the artifact."""
665
666 sha256: str | None = None
667 """SHA256 hash of the artifact."""
668
669 size_in_bytes: NonNegativeInt = 0
670 """The size of the artifact in bytes."""
671
672 colander_internal_type: Literal["artifact"] = "artifact"
673 """Internal type discriminator for (de)serialization."""
674
675
[docs]
676class DataFragment(Entity):
677 """
678 DataFragment represents a fragment of data, such as a code snippet, text, or other content.
679
680 This class extends the Entity base class and includes additional fields specific to data fragments,
681 such as their type, content, and the artifact from which they were extracted.
682
683 Example:
684 >>> data_fragment_type = DataFragmentTypes.CODE.value
685 >>> artifact = Artifact(
686 ... name='example_artifact',
687 ... type=ArtifactTypes.DOCUMENT.value
688 ... )
689 >>> data_fragment = DataFragment(
690 ... name='Sample Code',
691 ... type=data_fragment_type,
692 ... content='print("Hello, World!")',
693 ... extracted_from=artifact
694 ... )
695 >>> print(data_fragment.content)
696 print("Hello, World!")
697 """
698
699 type: DataFragmentType
700 """The type definition for the data fragment."""
701
702 content: str | None = None
703 """The content of the data fragment."""
704
705 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None
706 """Reference to the artifact from which this data fragment was extracted."""
707
708 colander_internal_type: Literal["datafragment"] = "datafragment"
709 """Internal type discriminator for (de)serialization."""
710
711
[docs]
712class Threat(Entity):
713 """
714 Threat represents a threat entity, such as a malware family, campaign, or adversary.
715
716 This class extends the Entity base class and includes a type field for threat classification.
717
718 Example:
719 >>> threat_type = ThreatTypes.TROJAN.value
720 >>> threat = Threat(
721 ... name='Emotet',
722 ... type=threat_type
723 ... )
724 >>> print(threat.name)
725 Emotet
726 """
727
728 type: ThreatType
729 """The type definition for the threat."""
730
731 colander_internal_type: Literal["threat"] = "threat"
732 """Internal type discriminator for (de)serialization."""
733
734
[docs]
735class Observable(Entity):
736 """
737 Observable represents an entity that can be observed or detected within the system.
738
739 This class extends the Entity base class and includes additional fields specific to observables,
740 such as classification, raw value, extraction source, associated threat, and operator.
741
742 Example:
743 >>> ot = ObservableTypes.IPV4.value
744 >>> obs = Observable(
745 ... name='1.2.3.4',
746 ... type=ot,
747 ... classification='malicious',
748 ... raw_value='1.2.3.4',
749 ... attributes={'asn': 'AS123'}
750 ... )
751 >>> print(obs.name)
752 1.2.3.4
753 """
754
755 type: ObservableType = Field(...)
756 """The type definition for the observable."""
757
758 attributes: Optional[Dict[str, str]] = None
759 """Dictionary of additional attributes for the observable."""
760
761 classification: str | None = Field(default=None, max_length=512)
762 """Optional classification label for the observable."""
763
764 raw_value: str | None = None
765 """The raw value associated with the observable."""
766
767 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None
768 """Reference to the artifact from which this observable was extracted."""
769
770 associated_threat: Optional[Threat] | Optional[ObjectReference] = None
771 """Reference to an associated threat."""
772
773 operated_by: Optional[Actor] | Optional[ObjectReference] = None
774 """Reference to the actor operating this observable."""
775
776 colander_internal_type: Literal["observable"] = "observable"
777 """Internal type discriminator for (de)serialization."""
778
779
[docs]
780class DetectionRule(Entity):
781 """
782 DetectionRule represents a rule used for detecting specific content or logic related to observables or
783 object references.
784
785 This class is designed to encapsulate detection rules that can be applied across various systems or platforms to
786 identify patterns or conditions defined by the user.
787
788 Example:
789 >>> drt = DetectionRuleTypes.YARA.value
790 >>> rule = DetectionRule(
791 ... name='Detect Malicious IP',
792 ... type=drt,
793 ... content='rule malicious_ip { condition: true }',
794 ... )
795 >>> print(rule.name)
796 Detect Malicious IP
797 """
798
799 type: DetectionRuleType
800 """The type definition for the detection rule."""
801
802 content: str | None = None
803 """The content or logic of the detection rule."""
804
805 targeted_observables: Optional[List[Observable]] | Optional[List[ObjectReference]] = None
806 """List of observables or references targeted by this detection rule."""
807
808 colander_internal_type: Literal["detectionrule"] = "detectionrule"
809 """Internal type discriminator for (de)serialization."""
810
811
[docs]
812class Event(Entity):
813 """
814 Event represents an occurrence or activity observed within a system, such as a detection, alert, or log entry.
815
816 This class extends the Entity base class and includes additional fields specific to events,
817 such as timestamps, count, involved observables, and references to related entities.
818
819 Example:
820 >>> et = EventTypes.HIT.value
821 >>> obs_type = ObservableTypes.IPV4.value
822 >>> obs = Observable(
823 ... id=uuid4(),
824 ... name='8.8.8.8',
825 ... type=obs_type
826 ... )
827 >>> event = Event(
828 ... name='Suspicious Connection',
829 ... type=et,
830 ... first_seen=datetime(2024, 6, 1, 12, 0, tzinfo=UTC),
831 ... last_seen=datetime(2024, 6, 1, 12, 5, tzinfo=UTC),
832 ... involved_observables=[obs]
833 ... )
834 >>> print(event.name)
835 Suspicious Connection
836 """
837
838 type: EventType
839 """The type definition for the event."""
840
841 attributes: Optional[Dict[str, str]] = None
842 """Dictionary of additional attributes for the event."""
843
844 first_seen: datetime = datetime.now(UTC)
845 """The timestamp when the event was first observed."""
846
847 last_seen: datetime = datetime.now(UTC)
848 """The timestamp when the event was last observed."""
849
850 count: PositiveInt = 1
851 """The number of times this event was observed."""
852
853 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None
854 """Reference to the artifact from which this event was extracted."""
855
856 observed_on: Optional[Device] | Optional[ObjectReference] = None
857 """Reference to the device on which this event was observed."""
858
859 detected_by: Optional[DetectionRule] | Optional[ObjectReference] = None
860 """Reference to the detection rule that detected this event."""
861
862 # ToDo: missing attribute in Colander implementation
863 attributed_to: Optional[Actor] | Optional[ObjectReference] = None
864 """Reference to the actor attributed to this event."""
865
866 # ToDo: missing attribute in Colander implementation
867 target: Optional[Actor] | Optional[ObjectReference] = None
868 """Reference to the actor targeted during this event."""
869
870 involved_observables: List[Observable] | List[ObjectReference] = []
871 """List of observables or references involved in this event."""
872
873 colander_internal_type: Literal["event"] = "event"
874 """Internal type discriminator for (de)serialization."""
875
876 @model_validator(mode="after")
877 def _check_dates(self) -> Any:
878 if self.first_seen > self.last_seen:
879 raise ValueError("first_seen must be before last_seen")
880 return self
881
882
[docs]
883class ColanderRepository(object, metaclass=Singleton):
884 """Singleton repository for managing and storing Case, Entity, and EntityRelation objects.
885
886 This class provides centralized storage and reference management for all model instances,
887 supporting insertion, lookup, and reference resolution/unlinking.
888 """
889
890 cases: Dict[str, Case]
891 entities: Dict[str, EntityTypes]
892 relations: Dict[str, EntityRelation]
893
[docs]
894 def __init__(self):
895 """Initializes the repository with empty dictionaries for cases, entities, and relations."""
896 self.cases = LRUDict()
897 self.entities = LRUDict()
898 self.relations = LRUDict()
899
[docs]
900 def clear(self):
901 self.cases.clear()
902 self.entities.clear()
903 self.relations.clear()
904
[docs]
905 def __lshift__(self, other: EntityTypes | Case) -> None:
906 """Inserts an object into the appropriate repository dictionary.
907
908 Args:
909 other: The object (Entity, EntityRelation, or Case) to insert.
910 """
911 if isinstance(other, Entity):
912 self.entities[str(other.id)] = other
913 elif isinstance(other, EntityRelation):
914 self.relations[str(other.id)] = other
915 elif isinstance(other, Case):
916 self.cases[str(other.id)] = other
917
[docs]
918 def __rshift__(self, other: str | UUID4) -> EntityTypes | EntityRelation | Case | str | UUID4:
919 """Retrieves an object by its identifier from entities, relations, or cases.
920
921 Args:
922 other: The string or UUID identifier to look up.
923
924 Returns:
925 The found object or the identifier if not found.
926 """
927 _other = str(other)
928 if _other in self.entities:
929 return self.entities[_other]
930 elif _other in self.relations:
931 return self.relations[_other]
932 elif _other in self.cases:
933 return self.cases[_other]
934 return other
935
[docs]
936 def unlink_references(self):
937 """Unlinks all object references in entities, relations, and cases by replacing them with UUIDs."""
938 for _, entity in self.entities.items():
939 entity.unlink_references()
940 for _, relation in self.relations.items():
941 relation.unlink_references()
942 for _, case in self.cases.items():
943 case.unlink_references()
944
[docs]
945 def resolve_references(self):
946 """Resolves all UUID references in entities, relations, and cases to their corresponding objects."""
947 for _, entity in self.entities.items():
948 entity.resolve_references()
949 for _, relation in self.relations.items():
950 relation.resolve_references()
951 for _, case in self.cases.items():
952 case.resolve_references()
953
954
[docs]
955class ColanderFeed(ColanderType):
956 """ColanderFeed aggregates entities, relations, and cases for bulk operations or data exchange.
957
958 This class is used to load, manage, and resolve references for collections of model objects.
959
960 Example:
961 >>> feed_data = {
962 ... "entities": {
963 ... "204d4590-a3ee-4f24-8eaf-350ec2fa751b": {
964 ... "id": "204d4590-a3ee-4f24-8eaf-350ec2fa751b",
965 ... "name": "Example Observable",
966 ... "type": {"name": "IPv4", "short_name": "IPV4"},
967 ... "super_type": {"short_name": "observable"},
968 ... "colander_internal_type": "observable"
969 ... }
970 ... },
971 ... "relations": {},
972 ... "cases": {}
973 ... }
974 >>> feed = ColanderFeed.load(feed_data)
975 >>> print(list(feed.entities.keys()))
976 ['204d4590-a3ee-4f24-8eaf-350ec2fa751b']
977 """
978
979 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4())
980 """The unique identifier for the feed."""
981
982 name: str = ""
983 """Optional name of the feed."""
984
985 description: str = ""
986 """Optional description of the feed."""
987
988 entities: Optional[Dict[str, EntityTypes]] = {}
989 """Dictionary of entity objects, keyed by their IDs."""
990
991 relations: Optional[Dict[str, EntityRelation]] = {}
992 """Dictionary of entity relations, keyed by their IDs."""
993
994 cases: Optional[Dict[str, Case]] = {}
995 """Dictionary of case objects, keyed by their IDs."""
996
[docs]
997 @staticmethod
998 def load(raw_object: dict, reset_ids=False, resolve_types=True) -> "ColanderFeed":
999 """Loads an EntityFeed from a raw object, which can be either a dictionary or a list.
1000
1001 Args:
1002 raw_object: The raw data representing the entities and relations to be loaded into the EntityFeed.
1003 reset_ids: If true, resets the ids of the entities and relations to their values.
1004 resolve_types: If True, resolves entity types based on the types enum. Mandatory to find similar entities.
1005
1006 Returns:
1007 The EntityFeed loaded from a raw object.
1008
1009 Raises:
1010 ValueError: If there are inconsistencies in entity IDs or relations.
1011 """
1012 ColanderRepository().clear()
1013
1014 if "entities" in raw_object:
1015 for entity_id, entity in raw_object["entities"].items():
1016 if entity_id != entity.get("id"):
1017 raise ValueError(f"Relation {entity_id} does not match with the ID of {entity}")
1018 entity["colander_internal_type"] = entity["super_type"]["short_name"].lower()
1019 if "relations" in raw_object:
1020 for relation_id, relation in raw_object["relations"].items():
1021 if relation_id != relation.get("id"):
1022 raise ValueError(f"Relation {relation_id} does not match with the ID of {relation}")
1023 if (
1024 "obj_from" not in relation
1025 and "obj_to" not in relation
1026 and "obj_from_id" in relation
1027 and "obj_to_id" in relation
1028 ):
1029 relation["obj_from"] = relation["obj_from_id"]
1030 relation["obj_to"] = relation["obj_to_id"]
1031
1032 if reset_ids:
1033 # feed_objects = raw_object
1034 entities = {}
1035 relations = {}
1036 rewrite_ids = {}
1037 for e in raw_object["entities"].keys():
1038 rewrite_ids[e] = str(uuid4())
1039 for e in raw_object["relations"].keys():
1040 rewrite_ids[e] = str(uuid4())
1041 for entity in raw_object["entities"].values():
1042 for k, v in entity.items():
1043 if isinstance(v, str):
1044 entity[k] = rewrite_ids.get(v, v)
1045 if isinstance(v, list):
1046 entity[k] = [rewrite_ids.get(value, value) for value in v]
1047 entities[entity["id"]] = entity
1048 for relation in raw_object["relations"].values():
1049 for k, v in relation.items():
1050 if isinstance(v, str):
1051 relation[k] = rewrite_ids.get(v, v)
1052 relations[relation["id"]] = relation
1053 raw_object["entities"] = entities
1054 raw_object["relations"] = relations
1055
1056 entity_feed = ColanderFeed.model_validate(raw_object)
1057 if resolve_types:
1058 entity_feed.resolve_types()
1059 entity_feed.resolve_references()
1060 return entity_feed
1061
[docs]
1062 def resolve_types(self):
1063 for entity_id, entity in self.entities.items():
1064 super_type = CommonEntitySuperTypes.by_short_name(entity.super_type.short_name)
1065 entity.type = super_type.types_class.by_short_name(entity.type.short_name)
1066
[docs]
1067 def resolve_references(self, strict=False):
1068 """Resolves references within entities, relations, and cases.
1069
1070 Iterates over each entity, relation, and case within the respective collections, calling their
1071 `resolve_references` method to update them with any referenced data. This helps in synchronizing
1072 internal state with external dependencies or updates.
1073
1074 Args:
1075 strict: If True, raises a ValueError when a UUID reference cannot be resolved.
1076 If False, unresolved references remain as UUIDs.
1077 """
1078 for _, entity in self.entities.items():
1079 entity.resolve_references(strict=strict)
1080 for _, relation in self.relations.items():
1081 relation.resolve_references(strict=strict)
1082 for _, case in self.cases.items():
1083 case.resolve_references(strict=strict)
1084
[docs]
1085 def unlink_references(self) -> None:
1086 """Unlinks references from all entities, relations, and cases within the current context.
1087
1088 This method iterates through each entity, relation, and case stored in the `entities`, `relations`,
1089 and `cases` dictionaries respectively, invoking their `unlink_references()` methods to clear any references
1090 held by these objects. This operation is useful for breaking dependencies or preparing data for deletion
1091 or modification.
1092 """
1093 for _, entity in self.entities.items(): # type: ignore[union-attr]
1094 entity.unlink_references()
1095 for _, relation in self.relations.items(): # type: ignore[union-attr]
1096 relation.unlink_references()
1097 for _, case in self.cases.items(): # type: ignore[union-attr]
1098 case.unlink_references()
1099
[docs]
1100 def contains(self, obj: Any) -> bool:
1101 """Check if an object exists in the current feed by its identifier.
1102
1103 This method determines whether a given object (or its identifier) exists
1104 within any of the feed's collections: entities, relations, or cases.
1105 It extracts the object's ID and searches across all three collections.
1106
1107 Args:
1108 obj: The object to check for existence. Can be:
1109
1110 - An entity, relation, or case object with an 'id' attribute
1111 - A string or UUID representing an object ID
1112 - Any object that get_id can process
1113
1114 Returns:
1115 True if the object exists in entities, relations, or cases;
1116 False otherwise
1117
1118 Example:
1119 >>> feed = ColanderFeed()
1120 >>> obs = Observable(name="test", type=ObservableTypes.IPV4.value)
1121 >>> feed.entities[str(obs.id)] = obs
1122 >>> feed.contains(obs)
1123 True
1124 >>> feed.contains("nonexistent-id")
1125 False
1126 """
1127 object_id = str(get_id(obj))
1128 if not object_id:
1129 return False
1130
1131 if object_id in self.entities:
1132 return True
1133 if object_id in self.relations:
1134 return True
1135 if object_id in self.cases:
1136 return True
1137
1138 return False
1139
[docs]
1140 def add(self, obj: Union[Case, EntityTypes, EntityRelation]):
1141 """
1142 Adds an object to the feed's collection.
1143
1144 Args:
1145 obj: The object to add. Can be a Case, EntityTypes, or EntityRelation.
1146
1147 This method inserts the object into the appropriate dictionary (entities, relations, or cases)
1148 based on its type, using its stringified ID as the key. If the object already exists, it is not overwritten.
1149 """
1150 if isinstance(obj, Entity):
1151 self.entities.setdefault(str(obj.id), obj)
1152 if isinstance(obj, EntityRelation):
1153 self.relations.setdefault(str(obj.id), obj)
1154 if isinstance(obj, Case):
1155 self.cases.setdefault(str(obj.id), obj)
1156
[docs]
1157 def get(self, obj: Any) -> Optional[Union[Case, EntityTypes, EntityRelation]]:
1158 """Retrieve an object from the feed by its identifier.
1159
1160 This method searches for an object across all feed collections (entities, relations, cases)
1161 using the object's ID. It first checks if the object exists using the contains() method,
1162 then attempts to retrieve it from the appropriate collection.
1163
1164 Args:
1165 obj: The object to retrieve. Can be:
1166
1167 - An entity, relation, or case object with an 'id' attribute
1168 - A string or UUID representing an object ID
1169 - Any object that get_id can process
1170
1171 Returns:
1172 The found object if it exists in any of the collections (entities, relations, or cases), otherwise None.
1173 """
1174 if not self.contains(obj):
1175 return None
1176
1177 object_id = str(get_id(obj))
1178
1179 if object_id in self.entities:
1180 return self.entities.get(object_id)
1181 if object_id in self.relations:
1182 return self.relations.get(object_id)
1183 if object_id in self.cases:
1184 return self.cases.get(object_id)
1185
1186 return None
1187
[docs]
1188 def get_by_super_type(self, super_type: "CommonEntitySuperType") -> List[EntityTypes]:
1189 """
1190 Returns a list of entities matching the given super type.
1191
1192 Args:
1193 super_type: The CommonEntitySuperType to filter entities by.
1194
1195 Returns:
1196 A list of entities that are instances of the specified super type's model class.
1197 """
1198 entities = []
1199 for _, entity in self.entities.items():
1200 if isinstance(entity, super_type.model_class):
1201 entities.append(entity)
1202 return entities
1203
[docs]
1204 def remove_relation_duplicates(self):
1205 """
1206 Remove duplicate EntityRelation objects from the repository.
1207
1208 Iterates over all stored relations and identifies duplicates by comparing
1209 the `name`, `obj_from`, and `obj_to` properties. If two distinct relation
1210 instances are semantically identical, the latter discovered instance is
1211 scheduled for removal.
1212 """
1213 duplicates = []
1214 for relation_a in self.relations.values():
1215 for relation_b in self.relations.values():
1216 if (
1217 relation_a != relation_b
1218 and relation_a.name == relation_b.name
1219 and relation_a.obj_from == relation_b.obj_from
1220 and relation_a.obj_to == relation_b.obj_to
1221 ):
1222 duplicates.append(relation_b)
1223 for duplicate in duplicates:
1224 self.relations.pop(str(duplicate.id))
1225
[docs]
1226 def get_incoming_relations(self, entity: EntityTypes) -> Dict[str, EntityRelation]:
1227 """Retrieve all relations where the specified entity is the target (obj_to).
1228
1229 This method finds all entity relations in the feed where the given entity
1230 is the destination or target of the relationship. Only fully resolved
1231 relations are considered to ensure data consistency.
1232
1233 Args:
1234 entity: The entity to find incoming relations for. Must be an instance of Entity.
1235
1236 Returns:
1237 A dictionary mapping relation IDs to EntityRelation objects where the entity is the target (obj_to).
1238 """
1239 assert isinstance(entity, Entity)
1240 relations = {}
1241 for relation_id, relation in self.relations.items():
1242 if not relation.is_fully_resolved():
1243 continue
1244 if relation.obj_to == entity:
1245 relations[relation_id] = relation
1246 return relations
1247
[docs]
1248 def get_outgoing_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]:
1249 """Retrieve all relations where the specified entity is the source (obj_from).
1250
1251 This method finds all entity relations in the feed where the given entity
1252 is the source or origin of the relationship. Only fully resolved
1253 relations are considered to ensure data consistency.
1254
1255 Args:
1256 entity: The entity to find outgoing relations for. Must be an instance of Entity.
1257 exclude_immutables: If True, exclude immutable relations.
1258
1259 Returns:
1260 A dictionary mapping relation IDs to EntityRelation objects where the entity is the source (obj_from).
1261 """
1262 relations = {}
1263 if not exclude_immutables:
1264 for _, entity in self.entities.items():
1265 relations.update(entity.get_immutable_relations())
1266 for relation_id, relation in self.relations.items():
1267 if not relation.is_fully_resolved():
1268 continue
1269 if relation.obj_from == entity:
1270 relations[relation_id] = relation
1271 return relations
1272
[docs]
1273 def get_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]:
1274 """Retrieve all relations (both incoming and outgoing) for the specified entity.
1275
1276 This method combines the results of get_incoming_relations() and
1277 get_outgoing_relations() to provide a complete view of all relationships
1278 involving the specified entity, regardless of direction.
1279
1280 Args:
1281 entity: The entity to find all relations for. Must be an instance of Entity.
1282 exclude_immutables: If True, exclude immutable relations.
1283
1284 Returns:
1285 A dictionary mapping relation IDs to EntityRelation objects where the entity
1286 is either the source (obj_from) or target (obj_to).
1287 """
1288 assert isinstance(entity, Entity)
1289
1290 relations = {}
1291 relations.update(self.get_incoming_relations(entity))
1292 relations.update(self.get_outgoing_relations(entity, exclude_immutables=exclude_immutables))
1293
1294 return relations
1295
[docs]
1296 def get_entities_similar_to(self, entity: EntityTypes) -> Dict[str, EntityTypes]:
1297 """Find entities in the feed that are similar to the given entity.
1298
1299 This method searches through all entities in the feed to find those that match
1300 specific criteria based on the entity type. The similarity criteria include:
1301
1302 - Same entity type and name for all entities
1303 - For Artifacts: matching SHA256 hash (if available)
1304 - For DataFragments: matching content
1305 - For DetectionRules: matching content
1306 - For Events: matching first_seen and last_seen timestamps
1307
1308 Args:
1309 entity: The entity to find similar matches for. Must be an
1310 instance of one of the supported entity types (Actor, Artifact,
1311 DataFragment, DetectionRule, Device, Event, Observable, Threat).
1312
1313 Returns:
1314 A dictionary mapping entity IDs to
1315 EntityTypes objects that match the similarity criteria. Returns an empty
1316 dictionary if no similar entities are found, or None if the input entity
1317 is invalid.
1318
1319 Note:
1320 - For Artifact entities, the SHA256 hash must be present in the input entity
1321 for comparison to occur
1322 - The method performs exact matches on all criteria - no fuzzy matching
1323 - Entity type comparison uses the entity's type attribute for matching
1324 """
1325 candidates: Dict[str, EntityTypes] = {}
1326
1327 for feed_entity_id, feed_entity in self.entities.items():
1328 match = feed_entity.type == entity.type
1329 if not match:
1330 continue
1331 match &= feed_entity.name == entity.name
1332 if isinstance(entity, CommonEntitySuperTypes.ARTIFACT.value.model_class):
1333 match &= entity.sha256 is not None
1334 match &= feed_entity.sha256 == entity.sha256
1335 if isinstance(entity, CommonEntitySuperTypes.DATA_FRAGMENT.value.model_class):
1336 match &= feed_entity.content == entity.content
1337 if isinstance(entity, CommonEntitySuperTypes.DETECTION_RULE.value.model_class):
1338 match &= feed_entity.content == entity.content
1339 if isinstance(entity, CommonEntitySuperTypes.EVENT.value.model_class):
1340 match &= feed_entity.first_seen == entity.first_seen
1341 match &= feed_entity.last_seen == entity.last_seen
1342 if match:
1343 candidates[feed_entity_id] = feed_entity
1344
1345 return candidates
1346
[docs]
1347 def filter(
1348 self,
1349 maximum_tlp_level: TlpPapLevel,
1350 include_relations=True,
1351 include_cases=True,
1352 exclude_entity_types: Optional[List[EntityTypes]] = None,
1353 ) -> "ColanderFeed":
1354 """Filter the feed based on TLP (Traffic Light Protocol) level and optionally include relations and cases.
1355
1356 This method creates a new ColanderFeed containing only entities whose TLP level is below
1357 the specified maximum threshold. It can optionally include relations between filtered
1358 entities and cases associated with the filtered entities.
1359
1360 Args:
1361 maximum_tlp_level: The maximum TLP level threshold. Only entities
1362 with TLP levels strictly below this value will be included.
1363 include_relations: If True, includes relations where both
1364 source and target entities are present in the filtered feed. Defaults to True.
1365 include_cases: If True, includes cases associated with the filtered entities. Defaults to True.
1366 exclude_entity_types: If provided, entities of these types are excluded.
1367
1368 Returns:
1369 A new filtered feed containing entities, relations, and cases that meet the
1370 specified criteria.
1371 """
1372 assert isinstance(maximum_tlp_level, TlpPapLevel)
1373
1374 excluded_types = exclude_entity_types or []
1375
1376 self.resolve_references()
1377 filtered = ColanderFeed(name=self.name, description=self.description)
1378
1379 for entity_id, entity in self.entities.items():
1380 if entity.tlp.value < maximum_tlp_level.value and type(entity) not in excluded_types:
1381 filtered.entities[entity_id] = entity
1382
1383 for entity_id, entity in filtered.entities.items():
1384 # Only include relations of the entity
1385 if include_relations:
1386 for relation_id, relation in self.get_relations(entity).items():
1387 if filtered.contains(relation.obj_from) and filtered.contains(relation.obj_to):
1388 filtered.relations[relation_id] = relation
1389 # Only include the case associated with the entity
1390 if include_cases:
1391 if (case := self.get(entity.case)) is not None and case.tlp.value < maximum_tlp_level.value:
1392 filtered.cases[str(case.id)] = case
1393
1394 filtered.resolve_references()
1395 return filtered
1396
[docs]
1397 def overwrite_case(self, case: Case):
1398 """
1399 Overwrites the case for all entities and relations in the feed.
1400 This method updates the case reference for all entities and relations in the feed
1401 to the provided case object. The case is also added to the feed's case dictionary.
1402 This is useful when you want to reassign all feed contents to a specific case.
1403
1404 Args:
1405 case: The Case object to assign to all entities and relations in the feed.
1406 """
1407 self.cases[str(case.id)] = case
1408 for _, entity in self.entities.items():
1409 entity.case = case
1410 for _, relation in self.relations.items():
1411 relation.case = case
1412
[docs]
1413 def define_arbitrary_property(self, property_name, value: Any):
1414 """
1415 Defines an arbitrary property on all cases, entities, and relations in the feed.
1416
1417 Args:
1418 property_name: The name of the property to define.
1419 value: The value to assign to the property.
1420 """
1421 for _, case in self.cases.items():
1422 case.define_arbitrary_property(property_name, value)
1423 for _, entity in self.entities.items():
1424 entity.define_arbitrary_property(property_name, value)
1425 for _, relation in self.relations.items():
1426 relation.define_arbitrary_property(property_name, value)
1427
[docs]
1428 def break_immutable_relations(self):
1429 """
1430 Breaks immutable relations by converting object references to explicit relations.
1431 This method iterates through all entities in the feed and converts their immutable
1432 reference fields (those annotated with ObjectReference or List[ObjectReference])
1433 into explicit EntityRelation objects. The original reference fields are then
1434 cleared (set to None for single references or empty list for list references).
1435
1436 This is useful for creating a fully explicit representation of relationships
1437 where all connections are represented as EntityRelation objects rather than
1438 embedded object references.
1439
1440 Note:
1441 This method modifies the feed in-place by:
1442
1443 - Adding new EntityRelation objects to the relation dictionary
1444 - Clearing the original reference fields on entities
1445 """
1446 for _, entity in self.entities.items():
1447 for _, immutable_relation in entity.get_immutable_relations().items():
1448 self.relations[str(immutable_relation.id)] = immutable_relation
1449 object_reference = getattr(entity, immutable_relation.name)
1450 if isinstance(object_reference, list):
1451 setattr(entity, immutable_relation.name, [])
1452 else:
1453 setattr(entity, immutable_relation.name, None)
1454
[docs]
1455 def rebuild_immutable_relations(self):
1456 """
1457 Rebuilds immutable relations by restoring object references from explicit relations.
1458 This method iterates through all entities and their outgoing relations (excluding immutables)
1459 and attempts to restore the original immutable reference fields by setting the appropriate
1460 entity attributes. After successfully restoring a reference, the explicit relation is removed
1461 from the relation dictionary to avoid duplication.
1462
1463 The method handles both single object references and list-based references:
1464
1465 - For list fields: Appends the target object if not already present
1466 - For single fields: Sets the target object if the field is currently None
1467 - For existing matches: Removes the redundant explicit relation
1468
1469 This is typically used after breaking immutable relations to restore the original
1470 entity structure while cleaning up temporary explicit relations.
1471
1472 Note:
1473 This method modifies the feed in-place by updating entity attributes and
1474 removing relations from the relation dictionary.
1475 """
1476 for _, entity in self.entities.items():
1477 for _, relation in self.get_outgoing_relations(entity, exclude_immutables=True).items():
1478 obj_from = relation.obj_from
1479 obj_to = relation.obj_to
1480 if not hasattr(obj_from, relation.name):
1481 continue
1482 actual = getattr(obj_from, relation.name, None)
1483 field_info = obj_from.__class__.model_fields[relation.name]
1484 annotation_args = get_args(field_info.annotation) or [] # type: ignore[var-annotated]
1485 obj_to_type = type(obj_to)
1486 if List[obj_to_type] in annotation_args:
1487 if obj_to not in actual:
1488 actual.append(obj_to)
1489 setattr(obj_from, relation.name, actual)
1490 if obj_to in actual:
1491 self.relations.pop(str(relation.id))
1492 elif obj_to_type in annotation_args:
1493 if actual is None:
1494 setattr(obj_from, relation.name, obj_to)
1495 if obj_to == getattr(obj_from, relation.name, None):
1496 self.relations.pop(str(relation.id))
1497
1498
[docs]
1499class CommonEntitySuperType(BaseModel):
1500 """
1501 CommonEntitySuperType defines metadata for a super type of entities in the Colander data model.
1502
1503 This class is used to represent high-level categories of entities (such as Actor, Artifact, Device, etc.)
1504 and provides fields for the short name, display name, associated types, and the Python class
1505 implementing the entity.
1506 """
1507
1508 model_config: ConfigDict = ConfigDict(str_strip_whitespace=True, arbitrary_types_allowed=True, from_attributes=True)
1509
1510 short_name: str = Field(frozen=True, max_length=32)
1511 """A short name for the model type."""
1512
1513 name: str = Field(frozen=True, max_length=512)
1514 """The name of the model type."""
1515
1516 types: Optional[List[object]] = Field(default=None, exclude=True)
1517 """Optional reference to the enum or collection of supported types."""
1518
1519 model_class: Any = Field(default=None, exclude=True)
1520 """The Python class associated with this super type (Observable...)."""
1521
1522 type_class: Any = Field(default=None, exclude=True)
1523 """The Python class associated with the entity type (ObservableType...)."""
1524
1525 types_class: Any = Field(default=None, exclude=True)
1526 """The Python class associated with the entity types (ObservableTypes...)."""
1527
1528 default_type: Any = Field(default=None, exclude=True)
1529 """The default entity type (GENERIC...)."""
1530
[docs]
1531 def type_by_short_name(self, short_name: str):
1532 for t in self.types:
1533 if hasattr(t, short_name.upper()):
1534 return getattr(t, short_name.upper()).value
1535 return self.default_type.value
1536
1537 def __str__(self):
1538 return self.short_name
1539
1540 def __repr__(self):
1541 return self.short_name
1542
1543
[docs]
1544class CommonEntitySuperTypes(enum.Enum):
1545 """
1546 CommonEntitySuperTypes is an enumeration of all super types for entities in the Colander data model.
1547
1548 Each member of this enum represents a high-level entity category (such as Actor, Artifact, Device, etc.)
1549 and holds a CommonEntitySuperType instance containing metadata and references to the corresponding
1550 entity class and its supported types.
1551
1552 This enum is used for type resolution and validation across the model.
1553
1554 Example:
1555 >>> super_type = CommonEntitySuperTypes.ACTOR.value
1556 >>> print(super_type.name)
1557 Actor
1558 """
1559
1560 ACTOR = CommonEntitySuperType(
1561 short_name="ACTOR",
1562 name="Actor",
1563 model_class=Actor,
1564 type_class=ActorType,
1565 types_class=ActorTypes,
1566 default_type=ActorTypes.default,
1567 types=[t for t in ActorTypes],
1568 )
1569 ARTIFACT = CommonEntitySuperType(
1570 short_name="ARTIFACT",
1571 name="Artifact",
1572 model_class=Artifact,
1573 type_class=ArtifactType,
1574 types_class=ArtifactTypes,
1575 default_type=ArtifactTypes.default,
1576 types=[t for t in ArtifactTypes],
1577 )
1578 DATA_FRAGMENT = CommonEntitySuperType(
1579 short_name="DATAFRAGMENT",
1580 name="Data fragment",
1581 model_class=DataFragment,
1582 type_class=DataFragmentType,
1583 types_class=DataFragmentTypes,
1584 default_type=DataFragmentTypes.default,
1585 types=[t for t in DataFragmentTypes],
1586 )
1587 DETECTION_RULE = CommonEntitySuperType(
1588 short_name="DETECTIONRULE",
1589 name="Detection rule",
1590 model_class=DetectionRule,
1591 type_class=DetectionRuleType,
1592 types_class=DetectionRuleTypes,
1593 default_type=DetectionRuleTypes.default,
1594 types=[t for t in DetectionRuleTypes],
1595 )
1596 DEVICE = CommonEntitySuperType(
1597 short_name="DEVICE",
1598 name="Device",
1599 model_class=Device,
1600 type_class=DeviceType,
1601 types_class=DeviceTypes,
1602 default_type=DeviceTypes.default,
1603 types=[t for t in DeviceTypes],
1604 )
1605 EVENT = CommonEntitySuperType(
1606 short_name="EVENT",
1607 name="Event",
1608 model_class=Event,
1609 type_class=EventType,
1610 types_class=EventTypes,
1611 default_type=EventTypes.default,
1612 types=[t for t in EventTypes],
1613 )
1614 OBSERVABLE = CommonEntitySuperType(
1615 short_name="OBSERVABLE",
1616 name="Observable",
1617 model_class=Observable,
1618 type_class=ObservableType,
1619 types_class=ObservableTypes,
1620 default_type=ObservableTypes.default,
1621 types=[t for t in ObservableTypes],
1622 )
1623 THREAT = CommonEntitySuperType(
1624 short_name="THREAT",
1625 name="Threat",
1626 model_class=Threat,
1627 type_class=ThreatType,
1628 types_class=ThreatTypes,
1629 default_type=ThreatTypes.default,
1630 types=[t for t in ThreatTypes],
1631 )
1632
[docs]
1633 @classmethod
1634 def by_short_name(cls, short_name: str) -> Optional[CommonEntitySuperType]:
1635 sn = short_name.replace(" ", "_").upper()
1636 for member in cls:
1637 if member.value.short_name == sn:
1638 return member.value
1639 return None