Source code for colander_data_converter.base.models

   1import abc
   2import enum
   3from datetime import datetime, UTC
   4from typing import List, Dict, Optional, Union, Annotated, Literal, get_args, Any
   5from uuid import uuid4, UUID
   6
   7from pydantic import (
   8    PositiveInt,
   9    NonNegativeInt,
  10    UUID4,
  11    BaseModel,
  12    AnyUrl,
  13    computed_field,
  14    model_validator,
  15    ConfigDict,
  16    Field,
  17)
  18
  19from colander_data_converter.base.common import (
  20    ObjectReference,
  21    TlpPapLevel,
  22    Singleton,
  23    LRUDict,
  24)
  25from colander_data_converter.base.types.actor import ActorType, ActorTypes
  26from colander_data_converter.base.types.artifact import ArtifactType, ArtifactTypes
  27from colander_data_converter.base.types.base import EntityType_T
  28from colander_data_converter.base.types.data_fragment import DataFragmentType, DataFragmentTypes
  29from colander_data_converter.base.types.detection_rule import DetectionRuleType, DetectionRuleTypes
  30from colander_data_converter.base.types.device import DeviceType, DeviceTypes
  31from colander_data_converter.base.types.event import EventType, EventTypes
  32from colander_data_converter.base.types.observable import ObservableType, ObservableTypes
  33from colander_data_converter.base.types.threat import ThreatType, ThreatTypes
  34
  35resource_package = __name__
  36
  37
[docs] 38def get_id(obj: Any) -> Optional[UUID4]: 39 """ 40 Extracts a UUID4 identifier from the given object. 41 42 Args: 43 obj: The object to extract the UUID from. Can be a string, UUID, or an object with an 'id' attribute. 44 45 Returns: 46 The extracted UUID4 if available, otherwise None. 47 """ 48 if not obj: 49 return None 50 51 if isinstance(obj, str): 52 try: 53 return UUID(obj, version=4) 54 except (Exception,): # nosec 55 return None 56 elif isinstance(obj, UUID): 57 return obj 58 elif (obj_id := getattr(obj, "id", None)) is not None: 59 return get_id(obj_id) 60 61 return None
62 63 64# Annotated union type representing all possible entity definitions in the model. 65# This type is used for fields that can accept any of the defined entity classes. 66# The Field discriminator 'colander_internal_type' is used for type resolution during (de)serialization. 67EntityTypes = Annotated[ 68 Union[ 69 "Actor", 70 "Artifact", 71 "DataFragment", 72 "Observable", 73 "DetectionRule", 74 "Device", 75 "Event", 76 "Threat", 77 ], 78 Field(discriminator="colander_internal_type"), 79] 80 81 82# noinspection PyTypeChecker,PyBroadException
[docs] 83class ColanderType(BaseModel): 84 """Base class for all Colander model data_types, providing common functionality. 85 86 This class extends Pydantic's BaseModel and is intended to be subclassed by 87 all model entities. It includes methods for linking and unlinking object references, 88 resolving type hints, and extracting subclass information. 89 """ 90 91 model_config: ConfigDict = ConfigDict(str_strip_whitespace=True, arbitrary_types_allowed=True, from_attributes=True) 92
[docs] 93 def model_post_init(self, __context): 94 """Executes post-initialization logic for the model, ensuring the repository 95 registers the current subclass instance. 96 97 Args: 98 __context (Any): Additional context provided for post-initialization handling. 99 """ 100 _ = ColanderRepository() 101 _ << self
102 103 def _process_reference_fields(self, operation, strict=False): 104 """Helper method to process reference fields for both unlinking and resolving operations. 105 106 Args: 107 operation: The operation to perform, either 'unlink' or 'resolve'. 108 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 109 Only used for 'resolve' operation. Defaults to False. 110 111 Raises: 112 ValueError: If strict is True, and a UUID reference cannot be resolved. 113 AttributeError: If the class instance does not have the expected field or attribute. 114 """ 115 for field, info in self.__class__.model_fields.items(): 116 annotation_args = get_args(info.annotation) 117 if ObjectReference in annotation_args: 118 ref = getattr(self, field) 119 if operation == "unlink" and ref and type(ref) is not UUID: 120 setattr(self, field, ref.id) 121 elif operation == "resolve" and type(ref) is UUID: 122 x = ColanderRepository() >> ref 123 if strict and isinstance(x, UUID): 124 raise ValueError(f"Unable to resolve UUID reference {x}") 125 setattr(self, field, x) 126 elif List[ObjectReference] in annotation_args: 127 refs = getattr(self, field) 128 new_refs = [] 129 _update = False 130 for ref in refs: 131 if operation == "unlink" and ref and type(ref) is not UUID: 132 new_refs.append(ref.id) 133 _update = True 134 elif operation == "resolve" and type(ref) is UUID: 135 x = ColanderRepository() >> ref 136 if strict and isinstance(x, UUID): 137 raise ValueError(f"Unable to resolve UUID reference {x}") 138 new_refs.append(x) 139 _update = True 140 if _update: 141 setattr(self, field, new_refs) 142 162
[docs] 163 def resolve_references(self, strict=False): 164 """Resolves references for the fields in the object's model. 165 166 Fields annotated with `ObjectReference` or `List[ObjectReference]` are processed 167 to fetch and replace their UUID references with respective entities using the `Repository`. 168 169 This method updates the object in-place. 170 171 Args: 172 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 173 If False, unresolved references remain as UUIDs. 174 175 Raises: 176 ValueError: If strict is True and a UUID reference cannot be resolved. 177 """ 178 self._process_reference_fields("resolve", strict)
179
[docs] 180 def is_fully_resolved(self) -> bool: 181 """ 182 Checks whether all object references in the model are fully resolved. 183 184 This method verifies that all fields annotated as `ObjectReference` or `List[ObjectReference]` 185 do not contain unresolved UUIDs, indicating that references have been replaced with actual objects. 186 187 Returns: 188 bool: True if all references are resolved to objects, False if any remain as UUIDs. 189 """ 190 self.resolve_references() 191 192 for field, info in self.__class__.model_fields.items(): 193 annotation_args = get_args(info.annotation) 194 if ObjectReference in annotation_args: 195 ref = getattr(self, field) 196 if isinstance(ref, UUID): 197 return False 198 elif List[ObjectReference] in annotation_args: 199 refs = getattr(self, field) 200 for ref in refs: 201 if isinstance(ref, UUID): 202 return False 203 return True
204
[docs] 205 def has_property(self, property_name: str) -> bool: 206 """ 207 Checks if the model has a field with the given property name. 208 209 Args: 210 property_name: The name of the property to check. 211 212 Returns: 213 True if the property exists in the model fields, False otherwise. 214 """ 215 return property_name in self.__class__.model_fields
216
[docs] 217 def define_arbitrary_property(self, property_name: str, value: Any): 218 """ 219 Defines an arbitrary property on the model instance if it does not already exist. 220 221 Args: 222 property_name: The name of the property to define. 223 value: The value to assign to the property. 224 """ 225 if not self.has_property(property_name): 226 setattr(self, property_name, value)
227
[docs] 228 @classmethod 229 def subclasses(cls) -> Dict[str, type["EntityTypes"]]: 230 """Generates a dictionary containing all subclasses of the current class. 231 232 This method collects all the direct subclasses of the current class and maps their 233 names (converted to lowercase) to the class itself. It is primarily useful for 234 organizing and accessing class hierarchies dynamically. 235 236 Returns: 237 A dictionary where the keys are the lowercase names of the subclasses, and 238 the values are the subclass data_types themselves. 239 """ 240 subclasses = {} 241 for subclass in cls.__subclasses__(): 242 subclasses[subclass.__name__.lower()] = subclass 243 return subclasses
244
[docs] 245 @classmethod 246 def resolve_type(cls, content_type: str) -> type["EntityTypes"]: 247 """Resolves a specific type of entity definition based on the provided content type by 248 matching it against the available subclasses of the class. This utility ensures that 249 the given content type is valid and matches one of the registered subclasses. 250 251 Args: 252 content_type: A string representing the type of content to be resolved. 253 Must match the name of a subclass (in lowercase) of the current class. 254 255 Returns: 256 The resolved class type corresponding to the provided content type. 257 """ 258 _content_type = content_type.lower() 259 _subclasses = cls.subclasses() 260 assert _content_type in _subclasses 261 return _subclasses[_content_type]
262
[docs] 263 @classmethod 264 def extract_type_hints(cls, obj: dict) -> str: 265 """Extracts type hints from a given dictionary based on specific keys. 266 267 This class method attempts to retrieve type hints from a dictionary using a specific 268 key ("colander_internal_type") or nested keys ("super_type" and its "short_name" value). 269 If the dictionary does not match the expected structure or the keys are not available, 270 a ValueError is raised. 271 272 Args: 273 obj: The dictionary from which type hints need to be extracted. 274 275 Returns: 276 A string representing the extracted type hint. 277 278 Raises: 279 ValueError: If the type hint cannot be extracted from the provided dictionary. 280 """ 281 try: 282 if "colander_internal_type" in obj: 283 return obj.get("colander_internal_type", "") 284 elif "super_type" in obj: 285 return obj.get("super_type").get("short_name").lower().replace("_", "") # type: ignore[union-attr] 286 except (Exception,): # nosec 287 pass 288 raise ValueError("Unable to extract type hints.")
289 290 @computed_field 291 def super_type(self) -> "CommonEntitySuperType": 292 return self.get_super_type() 293
[docs] 294 def get_super_type(self) -> "CommonEntitySuperType": 295 return CommonEntitySuperType( 296 **{ 297 "name": self.__class__.__name__, 298 "short_name": self.__class__.__name__.upper(), 299 "_class": self.__class__, 300 } 301 )
302 303
[docs] 304class Case(ColanderType): 305 """Case represents a collection or grouping of related entities, artifacts, or events. 306 307 This class is used to organize and manage related data, such as incidents, investigations, or projects. 308 309 Example: 310 >>> case = Case( 311 ... name='Investigation Alpha', 312 ... description='Investigation of suspicious activity' 313 ... ) 314 >>> print(case.name) 315 Investigation Alpha 316 """ 317 318 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 319 """The unique identifier for the case.""" 320 321 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 322 """The timestamp when the case was created.""" 323 324 updated_at: datetime = Field(default=datetime.now(UTC)) 325 """The timestamp when the case was last updated.""" 326 327 name: str = Field(..., min_length=1, max_length=512) 328 """The name of the case.""" 329 330 description: str = Field(..., min_length=1) 331 """A description of the case.""" 332 333 documentation: str | None = None 334 """Optional documentation or notes for the case.""" 335 336 public_key: str | None = None 337 """Optional public key of the case.""" 338 339 pap: TlpPapLevel = TlpPapLevel.WHITE 340 """The PAP (Permissible Actions Protocol) level for the case.""" 341 342 parent_case: Optional["Case"] | Optional[ObjectReference] = None 343 """Reference to a parent case, if this case is a sub-case.""" 344 345 tlp: TlpPapLevel = TlpPapLevel.WHITE 346 """The TLP (Traffic Light Protocol) level for the case.""" 347 348 colander_internal_type: Literal["case"] = "case" 349 """Internal type discriminator for (de)serialization."""
350 351
[docs] 352class Entity(ColanderType, abc.ABC): 353 """Entity is an abstract base class representing a core object in the model. 354 355 This class provides common fields for all entities, including identifiers, timestamps, descriptive fields, 356 and references to cases. Examples include actors, artifacts, devices, etc. 357 """ 358 359 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 360 """The unique identifier for the entity.""" 361 362 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 363 """The timestamp when the entity was created.""" 364 365 updated_at: datetime = Field(default=datetime.now(UTC)) 366 """The timestamp when the entity was last updated.""" 367 368 name: str = Field(..., min_length=1, max_length=512) 369 """The name of the entity.""" 370 371 case: Optional[Case] | Optional[ObjectReference] = None 372 """Reference to the case this entity belongs to.""" 373 374 description: str | None = None 375 """A description of the entity.""" 376 377 pap: TlpPapLevel = TlpPapLevel.WHITE 378 """The PAP (Permissible Actions Protocol) level for the entity.""" 379 380 source_url: str | AnyUrl | None = None 381 """Optional source URL for the entity.""" 382 383 tlp: TlpPapLevel = TlpPapLevel.WHITE 384 """The TLP (Traffic Light Protocol) level for the entity.""" 385
[docs] 386 def touch(self): 387 """Touch this entity's attributes.""" 388 self.updated_at = datetime.now(UTC)
389
[docs] 390 def get_type(self) -> Optional[EntityType_T]: 391 """ 392 Returns the type definition for this entity instance. 393 394 This method returns the type definition object (e.g., ObservableType, ActorType, DeviceType). 395 396 Returns: 397 The type definition object for this entity. The specific type depends 398 on the entity subclass (e.g., Observable returns ObservableType, Actor returns ActorType, etc.). 399 """ 400 if hasattr(self, "type"): 401 return getattr(self, "type") 402 return None
403
[docs] 404 def get_immutable_relations( 405 self, mapping: Optional[Dict[str, str]] = None, default_name: Optional[str] = None 406 ) -> Dict[str, "EntityRelation"]: 407 """ 408 Returns a dictionary of immutable relations derived from the entity's reference fields. 409 410 This method automatically creates EntityRelation objects by inspecting the entity's fields 411 and identifying those annotated as ObjectReference or List[ObjectReference]. These represent 412 the entity's connections to other entities in the knowledge graph, forming the basis for 413 graph traversal and relationship analysis. 414 415 Immutable relations are derived from the entity's structure and cannot be modified directly. 416 They represent inherent relationships defined by the entity's reference fields, such as 417 'extracted_from', 'operated_by', 'associated_threat', etc. 418 419 Args: 420 mapping: A dictionary to customize relation names. Keys should 421 be field names, and values should be the desired relation names. If not provided, 422 field names are converted to human-readable format by replacing underscores with spaces. 423 Defaults to None. 424 default_name: If a mapping is provided but no field mapping was found, the relation 425 will be named 'default_new_name'. 426 427 Returns: 428 A dictionary of EntityRelation objects keyed by their string 429 representation of relation IDs. Each relation represents a connection from this entity 430 to another entity referenced in its fields. 431 432 Note: 433 - The 'case' field is explicitly excluded from relation generation as it represents 434 a grouping mechanism rather than a semantic relationship. 435 - Only fields with actual values (not None or empty) are processed. 436 - Each EntityRelation created has this entity as the source (obj_from) and the 437 referenced entity as the target (obj_to). 438 """ 439 name_mapping = mapping or {} 440 relations: Dict[str, "EntityRelation"] = {} 441 for field_name, field_info in self.__class__.model_fields.items(): 442 if field_name == "case": 443 continue 444 field_annotation = get_args(field_info.annotation) 445 field_value = getattr(self, field_name, None) 446 447 if not field_value or not field_annotation: 448 continue 449 450 # Handle single ObjectReference 451 if ObjectReference in field_annotation: 452 relation_name = name_mapping.get(field_name, default_name or field_name) 453 relation = EntityRelation( 454 name=relation_name, 455 obj_from=self, 456 obj_to=field_value, 457 ) 458 relations[str(relation.id)] = relation 459 460 # Handle List[ObjectReference] 461 elif List[ObjectReference] in field_annotation: 462 for object_reference in field_value: 463 relation_name = name_mapping.get(field_name, default_name or field_name) 464 relation = EntityRelation( 465 name=relation_name, 466 obj_from=self, 467 obj_to=object_reference, 468 ) 469 relations[str(relation.id)] = relation 470 471 return relations
472
[docs] 473 def add_tags(self, tags: Optional[List[str]]): 474 if not tags or not hasattr(self, "attributes"): 475 return 476 entity_attributes = getattr(self, "attributes", {}) or {} 477 entity_tags = set(entity_attributes.get("tags", "").split(",") or []) 478 for tag in tags: 479 if tag and tag not in entity_tags: 480 entity_tags.add(tag) 481 if "" in entity_tags: 482 entity_tags.remove("") 483 entity_attributes["tags"] = ",".join(entity_tags) 484 self.attributes = entity_attributes
485
[docs] 486 def add_attributes(self, attributes: Dict[str, str]): 487 if not attributes or not hasattr(self, "attributes"): 488 return 489 entity_attributes = getattr(self, "attributes", {}) or {} 490 entity_attributes.update(attributes) 491 self.attributes = entity_attributes
492 493 def __hash__(self) -> int: 494 return hash(self.id)
495 496
[docs] 497class EntityRelation(ColanderType): 498 """EntityRelation represents a relationship between two entities in the model. 499 500 This class is used to define and manage relationships between objects, such as associations 501 between observables, devices, or actors. 502 503 Example: 504 >>> obs1 = Observable( 505 ... id=uuid4(), 506 ... name='1.1.1.1', 507 ... type=ObservableTypes.IPV4.value 508 ... ) 509 >>> obs2 = Observable( 510 ... id=uuid4(), 511 ... name='8.8.8.8', 512 ... type=ObservableTypes.IPV4.value 513 ... ) 514 >>> relation = EntityRelation( 515 ... id=uuid4(), 516 ... name='connection', 517 ... obj_from=obs1, 518 ... obj_to=obs2 519 ... ) 520 >>> print(relation.name) 521 connection 522 """ 523 524 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 525 """The unique identifier for the entity relation.""" 526 527 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 528 """The timestamp when the entity relation was created.""" 529 530 updated_at: datetime = Field(default=datetime.now(UTC)) 531 """The timestamp when the entity relation was last updated.""" 532 533 name: str = Field(..., min_length=1, max_length=512) 534 """The name of the entity relation.""" 535 536 case: Optional[Case] | Optional[ObjectReference] = None 537 """Reference to the case this relation belongs to.""" 538 539 attributes: Optional[Dict[str, str]] = None 540 """Dictionary of additional attributes for the relation.""" 541 542 obj_from: EntityTypes | ObjectReference = Field(...) 543 """The source entity or reference in the relation.""" 544 545 obj_to: EntityTypes | ObjectReference = Field(...) 546 """The target entity or reference in the relation.""" 547
[docs] 548 def touch(self): 549 """Touch this relation's attributes.""" 550 self.updated_at = datetime.now(UTC)
551 552
[docs] 553class Actor(Entity): 554 """ 555 Actor represents an individual or group involved in an event, activity, or system. 556 557 This class extends the Entity base class and includes additional fields specific to actors. 558 559 Example: 560 >>> actor_type = ActorTypes.INDIVIDUAL.value 561 >>> actor = Actor( 562 ... name='John Doe', 563 ... type=actor_type 564 ... ) 565 >>> print(actor.name) 566 John Doe 567 """ 568 569 type: ActorType 570 """The type definition for the actor.""" 571 572 colander_internal_type: Literal["actor"] = "actor" 573 """Internal type discriminator for (de)serialization.""" 574 575 attributes: Optional[Dict[str, str]] = None 576 """Dictionary of additional attributes for the device."""
577 578
[docs] 579class Device(Entity): 580 """ 581 Device represents a physical or virtual device in Colander. 582 583 This class extends the Entity base class and includes additional fields specific to devices, 584 such as their type, attributes, and the actor operating the device. 585 586 Example: 587 >>> device_type = DeviceTypes.MOBILE.value 588 >>> actor = Actor(name='John Doe', type=ActorTypes.INDIVIDUAL.value) 589 >>> device = Device( 590 ... name="John's Phone", 591 ... type=device_type, 592 ... operated_by=actor, 593 ... attributes={'os': 'Android', 'version': '12'} 594 ... ) 595 >>> print(device.name) 596 John's Phone 597 """ 598 599 type: DeviceType 600 """The type definition for the device.""" 601 602 attributes: Optional[Dict[str, str]] = None 603 """Dictionary of additional attributes for the device.""" 604 605 operated_by: Optional[Actor] | Optional[ObjectReference] = None 606 """Reference to the actor operating the device.""" 607 608 colander_internal_type: Literal["device"] = "device" 609 """Internal type discriminator for (de)serialization."""
610 611
[docs] 612class Artifact(Entity): 613 """ 614 Artifact represents a file or data object, such as a document, image, or binary, within the system. 615 616 This class extends the Entity base class and includes additional fields specific to artifacts, 617 such as type, attributes, extraction source, file metadata, and cryptographic hashes. 618 619 Example: 620 >>> artifact_type = ArtifactTypes.DOCUMENT.value 621 >>> device_type = DeviceTypes.LAPTOP.value 622 >>> device = Device(name='Analyst Laptop', type=device_type) 623 >>> artifact = Artifact( 624 ... name='malware_sample.pdf', 625 ... type=artifact_type, 626 ... extracted_from=device, 627 ... extension='pdf', 628 ... original_name='invoice.pdf', 629 ... mime_type='application/pdf', 630 ... md5='d41d8cd98f00b204e9800998ecf8427e', 631 ... sha1='da39a3ee5e6b4b0d3255bfef95601890afd80709', 632 ... sha256='e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', 633 ... size_in_bytes=12345 634 ... ) 635 >>> print(artifact.name) 636 malware_sample.pdf 637 """ 638 639 type: ArtifactType 640 """The type definition for the artifact.""" 641 642 attributes: Optional[Dict[str, str]] = None 643 """Dictionary of additional attributes for the artifact.""" 644 645 extracted_from: Optional[Device] | Optional[ObjectReference] = None 646 """Reference to the device from which this artifact was extracted.""" 647 648 extension: str | None = None 649 """The file extension of the artifact, if applicable.""" 650 651 original_name: str | None = None 652 """The original name of the artifact before ingestion.""" 653 654 mime_type: str | None = None 655 """The MIME type of the artifact.""" 656 657 detached_signature: str | None = None 658 """Optional detached signature for the artifact.""" 659 660 md5: str | None = None 661 """MD5 hash of the artifact.""" 662 663 sha1: str | None = None 664 """SHA1 hash of the artifact.""" 665 666 sha256: str | None = None 667 """SHA256 hash of the artifact.""" 668 669 size_in_bytes: NonNegativeInt = 0 670 """The size of the artifact in bytes.""" 671 672 colander_internal_type: Literal["artifact"] = "artifact" 673 """Internal type discriminator for (de)serialization."""
674 675
[docs] 676class DataFragment(Entity): 677 """ 678 DataFragment represents a fragment of data, such as a code snippet, text, or other content. 679 680 This class extends the Entity base class and includes additional fields specific to data fragments, 681 such as their type, content, and the artifact from which they were extracted. 682 683 Example: 684 >>> data_fragment_type = DataFragmentTypes.CODE.value 685 >>> artifact = Artifact( 686 ... name='example_artifact', 687 ... type=ArtifactTypes.DOCUMENT.value 688 ... ) 689 >>> data_fragment = DataFragment( 690 ... name='Sample Code', 691 ... type=data_fragment_type, 692 ... content='print("Hello, World!")', 693 ... extracted_from=artifact 694 ... ) 695 >>> print(data_fragment.content) 696 print("Hello, World!") 697 """ 698 699 type: DataFragmentType 700 """The type definition for the data fragment.""" 701 702 content: str | None = None 703 """The content of the data fragment.""" 704 705 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None 706 """Reference to the artifact from which this data fragment was extracted.""" 707 708 colander_internal_type: Literal["datafragment"] = "datafragment" 709 """Internal type discriminator for (de)serialization."""
710 711
[docs] 712class Threat(Entity): 713 """ 714 Threat represents a threat entity, such as a malware family, campaign, or adversary. 715 716 This class extends the Entity base class and includes a type field for threat classification. 717 718 Example: 719 >>> threat_type = ThreatTypes.TROJAN.value 720 >>> threat = Threat( 721 ... name='Emotet', 722 ... type=threat_type 723 ... ) 724 >>> print(threat.name) 725 Emotet 726 """ 727 728 type: ThreatType 729 """The type definition for the threat.""" 730 731 colander_internal_type: Literal["threat"] = "threat" 732 """Internal type discriminator for (de)serialization."""
733 734
[docs] 735class Observable(Entity): 736 """ 737 Observable represents an entity that can be observed or detected within the system. 738 739 This class extends the Entity base class and includes additional fields specific to observables, 740 such as classification, raw value, extraction source, associated threat, and operator. 741 742 Example: 743 >>> ot = ObservableTypes.IPV4.value 744 >>> obs = Observable( 745 ... name='1.2.3.4', 746 ... type=ot, 747 ... classification='malicious', 748 ... raw_value='1.2.3.4', 749 ... attributes={'asn': 'AS123'} 750 ... ) 751 >>> print(obs.name) 752 1.2.3.4 753 """ 754 755 type: ObservableType = Field(...) 756 """The type definition for the observable.""" 757 758 attributes: Optional[Dict[str, str]] = None 759 """Dictionary of additional attributes for the observable.""" 760 761 classification: str | None = Field(default=None, max_length=512) 762 """Optional classification label for the observable.""" 763 764 raw_value: str | None = None 765 """The raw value associated with the observable.""" 766 767 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None 768 """Reference to the artifact from which this observable was extracted.""" 769 770 associated_threat: Optional[Threat] | Optional[ObjectReference] = None 771 """Reference to an associated threat.""" 772 773 operated_by: Optional[Actor] | Optional[ObjectReference] = None 774 """Reference to the actor operating this observable.""" 775 776 colander_internal_type: Literal["observable"] = "observable" 777 """Internal type discriminator for (de)serialization."""
778 779
[docs] 780class DetectionRule(Entity): 781 """ 782 DetectionRule represents a rule used for detecting specific content or logic related to observables or 783 object references. 784 785 This class is designed to encapsulate detection rules that can be applied across various systems or platforms to 786 identify patterns or conditions defined by the user. 787 788 Example: 789 >>> drt = DetectionRuleTypes.YARA.value 790 >>> rule = DetectionRule( 791 ... name='Detect Malicious IP', 792 ... type=drt, 793 ... content='rule malicious_ip { condition: true }', 794 ... ) 795 >>> print(rule.name) 796 Detect Malicious IP 797 """ 798 799 type: DetectionRuleType 800 """The type definition for the detection rule.""" 801 802 content: str | None = None 803 """The content or logic of the detection rule.""" 804 805 targeted_observables: Optional[List[Observable]] | Optional[List[ObjectReference]] = None 806 """List of observables or references targeted by this detection rule.""" 807 808 colander_internal_type: Literal["detectionrule"] = "detectionrule" 809 """Internal type discriminator for (de)serialization."""
810 811
[docs] 812class Event(Entity): 813 """ 814 Event represents an occurrence or activity observed within a system, such as a detection, alert, or log entry. 815 816 This class extends the Entity base class and includes additional fields specific to events, 817 such as timestamps, count, involved observables, and references to related entities. 818 819 Example: 820 >>> et = EventTypes.HIT.value 821 >>> obs_type = ObservableTypes.IPV4.value 822 >>> obs = Observable( 823 ... id=uuid4(), 824 ... name='8.8.8.8', 825 ... type=obs_type 826 ... ) 827 >>> event = Event( 828 ... name='Suspicious Connection', 829 ... type=et, 830 ... first_seen=datetime(2024, 6, 1, 12, 0, tzinfo=UTC), 831 ... last_seen=datetime(2024, 6, 1, 12, 5, tzinfo=UTC), 832 ... involved_observables=[obs] 833 ... ) 834 >>> print(event.name) 835 Suspicious Connection 836 """ 837 838 type: EventType 839 """The type definition for the event.""" 840 841 attributes: Optional[Dict[str, str]] = None 842 """Dictionary of additional attributes for the event.""" 843 844 first_seen: datetime = datetime.now(UTC) 845 """The timestamp when the event was first observed.""" 846 847 last_seen: datetime = datetime.now(UTC) 848 """The timestamp when the event was last observed.""" 849 850 count: PositiveInt = 1 851 """The number of times this event was observed.""" 852 853 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None 854 """Reference to the artifact from which this event was extracted.""" 855 856 observed_on: Optional[Device] | Optional[ObjectReference] = None 857 """Reference to the device on which this event was observed.""" 858 859 detected_by: Optional[DetectionRule] | Optional[ObjectReference] = None 860 """Reference to the detection rule that detected this event.""" 861 862 # ToDo: missing attribute in Colander implementation 863 attributed_to: Optional[Actor] | Optional[ObjectReference] = None 864 """Reference to the actor attributed to this event.""" 865 866 # ToDo: missing attribute in Colander implementation 867 target: Optional[Actor] | Optional[ObjectReference] = None 868 """Reference to the actor targeted during this event.""" 869 870 involved_observables: List[Observable] | List[ObjectReference] = [] 871 """List of observables or references involved in this event.""" 872 873 colander_internal_type: Literal["event"] = "event" 874 """Internal type discriminator for (de)serialization.""" 875 876 @model_validator(mode="after") 877 def _check_dates(self) -> Any: 878 if self.first_seen > self.last_seen: 879 raise ValueError("first_seen must be before last_seen") 880 return self
881 882
[docs] 883class ColanderRepository(object, metaclass=Singleton): 884 """Singleton repository for managing and storing Case, Entity, and EntityRelation objects. 885 886 This class provides centralized storage and reference management for all model instances, 887 supporting insertion, lookup, and reference resolution/unlinking. 888 """ 889 890 cases: Dict[str, Case] 891 entities: Dict[str, EntityTypes] 892 relations: Dict[str, EntityRelation] 893
[docs] 894 def __init__(self): 895 """Initializes the repository with empty dictionaries for cases, entities, and relations.""" 896 self.cases = LRUDict() 897 self.entities = LRUDict() 898 self.relations = LRUDict()
899
[docs] 900 def clear(self): 901 self.cases.clear() 902 self.entities.clear() 903 self.relations.clear()
904
[docs] 905 def __lshift__(self, other: EntityTypes | Case) -> None: 906 """Inserts an object into the appropriate repository dictionary. 907 908 Args: 909 other: The object (Entity, EntityRelation, or Case) to insert. 910 """ 911 if isinstance(other, Entity): 912 self.entities[str(other.id)] = other 913 elif isinstance(other, EntityRelation): 914 self.relations[str(other.id)] = other 915 elif isinstance(other, Case): 916 self.cases[str(other.id)] = other
917
[docs] 918 def __rshift__(self, other: str | UUID4) -> EntityTypes | EntityRelation | Case | str | UUID4: 919 """Retrieves an object by its identifier from entities, relations, or cases. 920 921 Args: 922 other: The string or UUID identifier to look up. 923 924 Returns: 925 The found object or the identifier if not found. 926 """ 927 _other = str(other) 928 if _other in self.entities: 929 return self.entities[_other] 930 elif _other in self.relations: 931 return self.relations[_other] 932 elif _other in self.cases: 933 return self.cases[_other] 934 return other
935 944
[docs] 945 def resolve_references(self): 946 """Resolves all UUID references in entities, relations, and cases to their corresponding objects.""" 947 for _, entity in self.entities.items(): 948 entity.resolve_references() 949 for _, relation in self.relations.items(): 950 relation.resolve_references() 951 for _, case in self.cases.items(): 952 case.resolve_references()
953 954
[docs] 955class ColanderFeed(ColanderType): 956 """ColanderFeed aggregates entities, relations, and cases for bulk operations or data exchange. 957 958 This class is used to load, manage, and resolve references for collections of model objects. 959 960 Example: 961 >>> feed_data = { 962 ... "entities": { 963 ... "204d4590-a3ee-4f24-8eaf-350ec2fa751b": { 964 ... "id": "204d4590-a3ee-4f24-8eaf-350ec2fa751b", 965 ... "name": "Example Observable", 966 ... "type": {"name": "IPv4", "short_name": "IPV4"}, 967 ... "super_type": {"short_name": "observable"}, 968 ... "colander_internal_type": "observable" 969 ... } 970 ... }, 971 ... "relations": {}, 972 ... "cases": {} 973 ... } 974 >>> feed = ColanderFeed.load(feed_data) 975 >>> print(list(feed.entities.keys())) 976 ['204d4590-a3ee-4f24-8eaf-350ec2fa751b'] 977 """ 978 979 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 980 """The unique identifier for the feed.""" 981 982 name: str = "" 983 """Optional name of the feed.""" 984 985 description: str = "" 986 """Optional description of the feed.""" 987 988 entities: Optional[Dict[str, EntityTypes]] = {} 989 """Dictionary of entity objects, keyed by their IDs.""" 990 991 relations: Optional[Dict[str, EntityRelation]] = {} 992 """Dictionary of entity relations, keyed by their IDs.""" 993 994 cases: Optional[Dict[str, Case]] = {} 995 """Dictionary of case objects, keyed by their IDs.""" 996
[docs] 997 @staticmethod 998 def load(raw_object: dict, reset_ids=False, resolve_types=True) -> "ColanderFeed": 999 """Loads an EntityFeed from a raw object, which can be either a dictionary or a list. 1000 1001 Args: 1002 raw_object: The raw data representing the entities and relations to be loaded into the EntityFeed. 1003 reset_ids: If true, resets the ids of the entities and relations to their values. 1004 resolve_types: If True, resolves entity types based on the types enum. Mandatory to find similar entities. 1005 1006 Returns: 1007 The EntityFeed loaded from a raw object. 1008 1009 Raises: 1010 ValueError: If there are inconsistencies in entity IDs or relations. 1011 """ 1012 ColanderRepository().clear() 1013 1014 if "entities" in raw_object: 1015 for entity_id, entity in raw_object["entities"].items(): 1016 if entity_id != entity.get("id"): 1017 raise ValueError(f"Relation {entity_id} does not match with the ID of {entity}") 1018 entity["colander_internal_type"] = entity["super_type"]["short_name"].lower() 1019 if "relations" in raw_object: 1020 for relation_id, relation in raw_object["relations"].items(): 1021 if relation_id != relation.get("id"): 1022 raise ValueError(f"Relation {relation_id} does not match with the ID of {relation}") 1023 if ( 1024 "obj_from" not in relation 1025 and "obj_to" not in relation 1026 and "obj_from_id" in relation 1027 and "obj_to_id" in relation 1028 ): 1029 relation["obj_from"] = relation["obj_from_id"] 1030 relation["obj_to"] = relation["obj_to_id"] 1031 1032 if reset_ids: 1033 # feed_objects = raw_object 1034 entities = {} 1035 relations = {} 1036 rewrite_ids = {} 1037 for e in raw_object["entities"].keys(): 1038 rewrite_ids[e] = str(uuid4()) 1039 for e in raw_object["relations"].keys(): 1040 rewrite_ids[e] = str(uuid4()) 1041 for entity in raw_object["entities"].values(): 1042 for k, v in entity.items(): 1043 if isinstance(v, str): 1044 entity[k] = rewrite_ids.get(v, v) 1045 if isinstance(v, list): 1046 entity[k] = [rewrite_ids.get(value, value) for value in v] 1047 entities[entity["id"]] = entity 1048 for relation in raw_object["relations"].values(): 1049 for k, v in relation.items(): 1050 if isinstance(v, str): 1051 relation[k] = rewrite_ids.get(v, v) 1052 relations[relation["id"]] = relation 1053 raw_object["entities"] = entities 1054 raw_object["relations"] = relations 1055 1056 entity_feed = ColanderFeed.model_validate(raw_object) 1057 if resolve_types: 1058 entity_feed.resolve_types() 1059 entity_feed.resolve_references() 1060 return entity_feed
1061
[docs] 1062 def resolve_types(self): 1063 for entity_id, entity in self.entities.items(): 1064 super_type = CommonEntitySuperTypes.by_short_name(entity.super_type.short_name) 1065 entity.type = super_type.types_class.by_short_name(entity.type.short_name)
1066
[docs] 1067 def resolve_references(self, strict=False): 1068 """Resolves references within entities, relations, and cases. 1069 1070 Iterates over each entity, relation, and case within the respective collections, calling their 1071 `resolve_references` method to update them with any referenced data. This helps in synchronizing 1072 internal state with external dependencies or updates. 1073 1074 Args: 1075 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 1076 If False, unresolved references remain as UUIDs. 1077 """ 1078 for _, entity in self.entities.items(): 1079 entity.resolve_references(strict=strict) 1080 for _, relation in self.relations.items(): 1081 relation.resolve_references(strict=strict) 1082 for _, case in self.cases.items(): 1083 case.resolve_references(strict=strict)
1084 1099
[docs] 1100 def contains(self, obj: Any) -> bool: 1101 """Check if an object exists in the current feed by its identifier. 1102 1103 This method determines whether a given object (or its identifier) exists 1104 within any of the feed's collections: entities, relations, or cases. 1105 It extracts the object's ID and searches across all three collections. 1106 1107 Args: 1108 obj: The object to check for existence. Can be: 1109 1110 - An entity, relation, or case object with an 'id' attribute 1111 - A string or UUID representing an object ID 1112 - Any object that get_id can process 1113 1114 Returns: 1115 True if the object exists in entities, relations, or cases; 1116 False otherwise 1117 1118 Example: 1119 >>> feed = ColanderFeed() 1120 >>> obs = Observable(name="test", type=ObservableTypes.IPV4.value) 1121 >>> feed.entities[str(obs.id)] = obs 1122 >>> feed.contains(obs) 1123 True 1124 >>> feed.contains("nonexistent-id") 1125 False 1126 """ 1127 object_id = str(get_id(obj)) 1128 if not object_id: 1129 return False 1130 1131 if object_id in self.entities: 1132 return True 1133 if object_id in self.relations: 1134 return True 1135 if object_id in self.cases: 1136 return True 1137 1138 return False
1139
[docs] 1140 def add(self, obj: Union[Case, EntityTypes, EntityRelation]): 1141 """ 1142 Adds an object to the feed's collection. 1143 1144 Args: 1145 obj: The object to add. Can be a Case, EntityTypes, or EntityRelation. 1146 1147 This method inserts the object into the appropriate dictionary (entities, relations, or cases) 1148 based on its type, using its stringified ID as the key. If the object already exists, it is not overwritten. 1149 """ 1150 if isinstance(obj, Entity): 1151 self.entities.setdefault(str(obj.id), obj) 1152 if isinstance(obj, EntityRelation): 1153 self.relations.setdefault(str(obj.id), obj) 1154 if isinstance(obj, Case): 1155 self.cases.setdefault(str(obj.id), obj)
1156
[docs] 1157 def get(self, obj: Any) -> Optional[Union[Case, EntityTypes, EntityRelation]]: 1158 """Retrieve an object from the feed by its identifier. 1159 1160 This method searches for an object across all feed collections (entities, relations, cases) 1161 using the object's ID. It first checks if the object exists using the contains() method, 1162 then attempts to retrieve it from the appropriate collection. 1163 1164 Args: 1165 obj: The object to retrieve. Can be: 1166 1167 - An entity, relation, or case object with an 'id' attribute 1168 - A string or UUID representing an object ID 1169 - Any object that get_id can process 1170 1171 Returns: 1172 The found object if it exists in any of the collections (entities, relations, or cases), otherwise None. 1173 """ 1174 if not self.contains(obj): 1175 return None 1176 1177 object_id = str(get_id(obj)) 1178 1179 if object_id in self.entities: 1180 return self.entities.get(object_id) 1181 if object_id in self.relations: 1182 return self.relations.get(object_id) 1183 if object_id in self.cases: 1184 return self.cases.get(object_id) 1185 1186 return None
1187
[docs] 1188 def get_by_super_type(self, super_type: "CommonEntitySuperType") -> List[EntityTypes]: 1189 """ 1190 Returns a list of entities matching the given super type. 1191 1192 Args: 1193 super_type: The CommonEntitySuperType to filter entities by. 1194 1195 Returns: 1196 A list of entities that are instances of the specified super type's model class. 1197 """ 1198 entities = [] 1199 for _, entity in self.entities.items(): 1200 if isinstance(entity, super_type.model_class): 1201 entities.append(entity) 1202 return entities
1203
[docs] 1204 def remove_relation_duplicates(self): 1205 """ 1206 Remove duplicate EntityRelation objects from the repository. 1207 1208 Iterates over all stored relations and identifies duplicates by comparing 1209 the `name`, `obj_from`, and `obj_to` properties. If two distinct relation 1210 instances are semantically identical, the latter discovered instance is 1211 scheduled for removal. 1212 """ 1213 duplicates = [] 1214 for relation_a in self.relations.values(): 1215 for relation_b in self.relations.values(): 1216 if ( 1217 relation_a != relation_b 1218 and relation_a.name == relation_b.name 1219 and relation_a.obj_from == relation_b.obj_from 1220 and relation_a.obj_to == relation_b.obj_to 1221 ): 1222 duplicates.append(relation_b) 1223 for duplicate in duplicates: 1224 self.relations.pop(str(duplicate.id))
1225
[docs] 1226 def get_incoming_relations(self, entity: EntityTypes) -> Dict[str, EntityRelation]: 1227 """Retrieve all relations where the specified entity is the target (obj_to). 1228 1229 This method finds all entity relations in the feed where the given entity 1230 is the destination or target of the relationship. Only fully resolved 1231 relations are considered to ensure data consistency. 1232 1233 Args: 1234 entity: The entity to find incoming relations for. Must be an instance of Entity. 1235 1236 Returns: 1237 A dictionary mapping relation IDs to EntityRelation objects where the entity is the target (obj_to). 1238 """ 1239 assert isinstance(entity, Entity) 1240 relations = {} 1241 for relation_id, relation in self.relations.items(): 1242 if not relation.is_fully_resolved(): 1243 continue 1244 if relation.obj_to == entity: 1245 relations[relation_id] = relation 1246 return relations
1247
[docs] 1248 def get_outgoing_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]: 1249 """Retrieve all relations where the specified entity is the source (obj_from). 1250 1251 This method finds all entity relations in the feed where the given entity 1252 is the source or origin of the relationship. Only fully resolved 1253 relations are considered to ensure data consistency. 1254 1255 Args: 1256 entity: The entity to find outgoing relations for. Must be an instance of Entity. 1257 exclude_immutables: If True, exclude immutable relations. 1258 1259 Returns: 1260 A dictionary mapping relation IDs to EntityRelation objects where the entity is the source (obj_from). 1261 """ 1262 relations = {} 1263 if not exclude_immutables: 1264 for _, entity in self.entities.items(): 1265 relations.update(entity.get_immutable_relations()) 1266 for relation_id, relation in self.relations.items(): 1267 if not relation.is_fully_resolved(): 1268 continue 1269 if relation.obj_from == entity: 1270 relations[relation_id] = relation 1271 return relations
1272
[docs] 1273 def get_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]: 1274 """Retrieve all relations (both incoming and outgoing) for the specified entity. 1275 1276 This method combines the results of get_incoming_relations() and 1277 get_outgoing_relations() to provide a complete view of all relationships 1278 involving the specified entity, regardless of direction. 1279 1280 Args: 1281 entity: The entity to find all relations for. Must be an instance of Entity. 1282 exclude_immutables: If True, exclude immutable relations. 1283 1284 Returns: 1285 A dictionary mapping relation IDs to EntityRelation objects where the entity 1286 is either the source (obj_from) or target (obj_to). 1287 """ 1288 assert isinstance(entity, Entity) 1289 1290 relations = {} 1291 relations.update(self.get_incoming_relations(entity)) 1292 relations.update(self.get_outgoing_relations(entity, exclude_immutables=exclude_immutables)) 1293 1294 return relations
1295
[docs] 1296 def get_entities_similar_to(self, entity: EntityTypes) -> Dict[str, EntityTypes]: 1297 """Find entities in the feed that are similar to the given entity. 1298 1299 This method searches through all entities in the feed to find those that match 1300 specific criteria based on the entity type. The similarity criteria include: 1301 1302 - Same entity type and name for all entities 1303 - For Artifacts: matching SHA256 hash (if available) 1304 - For DataFragments: matching content 1305 - For DetectionRules: matching content 1306 - For Events: matching first_seen and last_seen timestamps 1307 1308 Args: 1309 entity: The entity to find similar matches for. Must be an 1310 instance of one of the supported entity types (Actor, Artifact, 1311 DataFragment, DetectionRule, Device, Event, Observable, Threat). 1312 1313 Returns: 1314 A dictionary mapping entity IDs to 1315 EntityTypes objects that match the similarity criteria. Returns an empty 1316 dictionary if no similar entities are found, or None if the input entity 1317 is invalid. 1318 1319 Note: 1320 - For Artifact entities, the SHA256 hash must be present in the input entity 1321 for comparison to occur 1322 - The method performs exact matches on all criteria - no fuzzy matching 1323 - Entity type comparison uses the entity's type attribute for matching 1324 """ 1325 candidates: Dict[str, EntityTypes] = {} 1326 1327 for feed_entity_id, feed_entity in self.entities.items(): 1328 match = feed_entity.type == entity.type 1329 if not match: 1330 continue 1331 match &= feed_entity.name == entity.name 1332 if isinstance(entity, CommonEntitySuperTypes.ARTIFACT.value.model_class): 1333 match &= entity.sha256 is not None 1334 match &= feed_entity.sha256 == entity.sha256 1335 if isinstance(entity, CommonEntitySuperTypes.DATA_FRAGMENT.value.model_class): 1336 match &= feed_entity.content == entity.content 1337 if isinstance(entity, CommonEntitySuperTypes.DETECTION_RULE.value.model_class): 1338 match &= feed_entity.content == entity.content 1339 if isinstance(entity, CommonEntitySuperTypes.EVENT.value.model_class): 1340 match &= feed_entity.first_seen == entity.first_seen 1341 match &= feed_entity.last_seen == entity.last_seen 1342 if match: 1343 candidates[feed_entity_id] = feed_entity 1344 1345 return candidates
1346
[docs] 1347 def filter( 1348 self, 1349 maximum_tlp_level: TlpPapLevel, 1350 include_relations=True, 1351 include_cases=True, 1352 exclude_entity_types: Optional[List[EntityTypes]] = None, 1353 ) -> "ColanderFeed": 1354 """Filter the feed based on TLP (Traffic Light Protocol) level and optionally include relations and cases. 1355 1356 This method creates a new ColanderFeed containing only entities whose TLP level is below 1357 the specified maximum threshold. It can optionally include relations between filtered 1358 entities and cases associated with the filtered entities. 1359 1360 Args: 1361 maximum_tlp_level: The maximum TLP level threshold. Only entities 1362 with TLP levels strictly below this value will be included. 1363 include_relations: If True, includes relations where both 1364 source and target entities are present in the filtered feed. Defaults to True. 1365 include_cases: If True, includes cases associated with the filtered entities. Defaults to True. 1366 exclude_entity_types: If provided, entities of these types are excluded. 1367 1368 Returns: 1369 A new filtered feed containing entities, relations, and cases that meet the 1370 specified criteria. 1371 """ 1372 assert isinstance(maximum_tlp_level, TlpPapLevel) 1373 1374 excluded_types = exclude_entity_types or [] 1375 1376 self.resolve_references() 1377 filtered = ColanderFeed(name=self.name, description=self.description) 1378 1379 for entity_id, entity in self.entities.items(): 1380 if entity.tlp.value < maximum_tlp_level.value and type(entity) not in excluded_types: 1381 filtered.entities[entity_id] = entity 1382 1383 for entity_id, entity in filtered.entities.items(): 1384 # Only include relations of the entity 1385 if include_relations: 1386 for relation_id, relation in self.get_relations(entity).items(): 1387 if filtered.contains(relation.obj_from) and filtered.contains(relation.obj_to): 1388 filtered.relations[relation_id] = relation 1389 # Only include the case associated with the entity 1390 if include_cases: 1391 if (case := self.get(entity.case)) is not None and case.tlp.value < maximum_tlp_level.value: 1392 filtered.cases[str(case.id)] = case 1393 1394 filtered.resolve_references() 1395 return filtered
1396
[docs] 1397 def overwrite_case(self, case: Case): 1398 """ 1399 Overwrites the case for all entities and relations in the feed. 1400 This method updates the case reference for all entities and relations in the feed 1401 to the provided case object. The case is also added to the feed's case dictionary. 1402 This is useful when you want to reassign all feed contents to a specific case. 1403 1404 Args: 1405 case: The Case object to assign to all entities and relations in the feed. 1406 """ 1407 self.cases[str(case.id)] = case 1408 for _, entity in self.entities.items(): 1409 entity.case = case 1410 for _, relation in self.relations.items(): 1411 relation.case = case
1412
[docs] 1413 def define_arbitrary_property(self, property_name, value: Any): 1414 """ 1415 Defines an arbitrary property on all cases, entities, and relations in the feed. 1416 1417 Args: 1418 property_name: The name of the property to define. 1419 value: The value to assign to the property. 1420 """ 1421 for _, case in self.cases.items(): 1422 case.define_arbitrary_property(property_name, value) 1423 for _, entity in self.entities.items(): 1424 entity.define_arbitrary_property(property_name, value) 1425 for _, relation in self.relations.items(): 1426 relation.define_arbitrary_property(property_name, value)
1427
[docs] 1428 def break_immutable_relations(self): 1429 """ 1430 Breaks immutable relations by converting object references to explicit relations. 1431 This method iterates through all entities in the feed and converts their immutable 1432 reference fields (those annotated with ObjectReference or List[ObjectReference]) 1433 into explicit EntityRelation objects. The original reference fields are then 1434 cleared (set to None for single references or empty list for list references). 1435 1436 This is useful for creating a fully explicit representation of relationships 1437 where all connections are represented as EntityRelation objects rather than 1438 embedded object references. 1439 1440 Note: 1441 This method modifies the feed in-place by: 1442 1443 - Adding new EntityRelation objects to the relation dictionary 1444 - Clearing the original reference fields on entities 1445 """ 1446 for _, entity in self.entities.items(): 1447 for _, immutable_relation in entity.get_immutable_relations().items(): 1448 self.relations[str(immutable_relation.id)] = immutable_relation 1449 object_reference = getattr(entity, immutable_relation.name) 1450 if isinstance(object_reference, list): 1451 setattr(entity, immutable_relation.name, []) 1452 else: 1453 setattr(entity, immutable_relation.name, None)
1454
[docs] 1455 def rebuild_immutable_relations(self): 1456 """ 1457 Rebuilds immutable relations by restoring object references from explicit relations. 1458 This method iterates through all entities and their outgoing relations (excluding immutables) 1459 and attempts to restore the original immutable reference fields by setting the appropriate 1460 entity attributes. After successfully restoring a reference, the explicit relation is removed 1461 from the relation dictionary to avoid duplication. 1462 1463 The method handles both single object references and list-based references: 1464 1465 - For list fields: Appends the target object if not already present 1466 - For single fields: Sets the target object if the field is currently None 1467 - For existing matches: Removes the redundant explicit relation 1468 1469 This is typically used after breaking immutable relations to restore the original 1470 entity structure while cleaning up temporary explicit relations. 1471 1472 Note: 1473 This method modifies the feed in-place by updating entity attributes and 1474 removing relations from the relation dictionary. 1475 """ 1476 for _, entity in self.entities.items(): 1477 for _, relation in self.get_outgoing_relations(entity, exclude_immutables=True).items(): 1478 obj_from = relation.obj_from 1479 obj_to = relation.obj_to 1480 if not hasattr(obj_from, relation.name): 1481 continue 1482 actual = getattr(obj_from, relation.name, None) 1483 field_info = obj_from.__class__.model_fields[relation.name] 1484 annotation_args = get_args(field_info.annotation) or [] # type: ignore[var-annotated] 1485 obj_to_type = type(obj_to) 1486 if List[obj_to_type] in annotation_args: 1487 if obj_to not in actual: 1488 actual.append(obj_to) 1489 setattr(obj_from, relation.name, actual) 1490 if obj_to in actual: 1491 self.relations.pop(str(relation.id)) 1492 elif obj_to_type in annotation_args: 1493 if actual is None: 1494 setattr(obj_from, relation.name, obj_to) 1495 if obj_to == getattr(obj_from, relation.name, None): 1496 self.relations.pop(str(relation.id))
1497 1498
[docs] 1499class CommonEntitySuperType(BaseModel): 1500 """ 1501 CommonEntitySuperType defines metadata for a super type of entities in the Colander data model. 1502 1503 This class is used to represent high-level categories of entities (such as Actor, Artifact, Device, etc.) 1504 and provides fields for the short name, display name, associated types, and the Python class 1505 implementing the entity. 1506 """ 1507 1508 model_config: ConfigDict = ConfigDict(str_strip_whitespace=True, arbitrary_types_allowed=True, from_attributes=True) 1509 1510 short_name: str = Field(frozen=True, max_length=32) 1511 """A short name for the model type.""" 1512 1513 name: str = Field(frozen=True, max_length=512) 1514 """The name of the model type.""" 1515 1516 types: Optional[List[object]] = Field(default=None, exclude=True) 1517 """Optional reference to the enum or collection of supported types.""" 1518 1519 model_class: Any = Field(default=None, exclude=True) 1520 """The Python class associated with this super type (Observable...).""" 1521 1522 type_class: Any = Field(default=None, exclude=True) 1523 """The Python class associated with the entity type (ObservableType...).""" 1524 1525 types_class: Any = Field(default=None, exclude=True) 1526 """The Python class associated with the entity types (ObservableTypes...).""" 1527 1528 default_type: Any = Field(default=None, exclude=True) 1529 """The default entity type (GENERIC...).""" 1530
[docs] 1531 def type_by_short_name(self, short_name: str): 1532 for t in self.types: 1533 if hasattr(t, short_name.upper()): 1534 return getattr(t, short_name.upper()).value 1535 return self.default_type.value
1536 1537 def __str__(self): 1538 return self.short_name 1539 1540 def __repr__(self): 1541 return self.short_name
1542 1543
[docs] 1544class CommonEntitySuperTypes(enum.Enum): 1545 """ 1546 CommonEntitySuperTypes is an enumeration of all super types for entities in the Colander data model. 1547 1548 Each member of this enum represents a high-level entity category (such as Actor, Artifact, Device, etc.) 1549 and holds a CommonEntitySuperType instance containing metadata and references to the corresponding 1550 entity class and its supported types. 1551 1552 This enum is used for type resolution and validation across the model. 1553 1554 Example: 1555 >>> super_type = CommonEntitySuperTypes.ACTOR.value 1556 >>> print(super_type.name) 1557 Actor 1558 """ 1559 1560 ACTOR = CommonEntitySuperType( 1561 short_name="ACTOR", 1562 name="Actor", 1563 model_class=Actor, 1564 type_class=ActorType, 1565 types_class=ActorTypes, 1566 default_type=ActorTypes.default, 1567 types=[t for t in ActorTypes], 1568 ) 1569 ARTIFACT = CommonEntitySuperType( 1570 short_name="ARTIFACT", 1571 name="Artifact", 1572 model_class=Artifact, 1573 type_class=ArtifactType, 1574 types_class=ArtifactTypes, 1575 default_type=ArtifactTypes.default, 1576 types=[t for t in ArtifactTypes], 1577 ) 1578 DATA_FRAGMENT = CommonEntitySuperType( 1579 short_name="DATAFRAGMENT", 1580 name="Data fragment", 1581 model_class=DataFragment, 1582 type_class=DataFragmentType, 1583 types_class=DataFragmentTypes, 1584 default_type=DataFragmentTypes.default, 1585 types=[t for t in DataFragmentTypes], 1586 ) 1587 DETECTION_RULE = CommonEntitySuperType( 1588 short_name="DETECTIONRULE", 1589 name="Detection rule", 1590 model_class=DetectionRule, 1591 type_class=DetectionRuleType, 1592 types_class=DetectionRuleTypes, 1593 default_type=DetectionRuleTypes.default, 1594 types=[t for t in DetectionRuleTypes], 1595 ) 1596 DEVICE = CommonEntitySuperType( 1597 short_name="DEVICE", 1598 name="Device", 1599 model_class=Device, 1600 type_class=DeviceType, 1601 types_class=DeviceTypes, 1602 default_type=DeviceTypes.default, 1603 types=[t for t in DeviceTypes], 1604 ) 1605 EVENT = CommonEntitySuperType( 1606 short_name="EVENT", 1607 name="Event", 1608 model_class=Event, 1609 type_class=EventType, 1610 types_class=EventTypes, 1611 default_type=EventTypes.default, 1612 types=[t for t in EventTypes], 1613 ) 1614 OBSERVABLE = CommonEntitySuperType( 1615 short_name="OBSERVABLE", 1616 name="Observable", 1617 model_class=Observable, 1618 type_class=ObservableType, 1619 types_class=ObservableTypes, 1620 default_type=ObservableTypes.default, 1621 types=[t for t in ObservableTypes], 1622 ) 1623 THREAT = CommonEntitySuperType( 1624 short_name="THREAT", 1625 name="Threat", 1626 model_class=Threat, 1627 type_class=ThreatType, 1628 types_class=ThreatTypes, 1629 default_type=ThreatTypes.default, 1630 types=[t for t in ThreatTypes], 1631 ) 1632
[docs] 1633 @classmethod 1634 def by_short_name(cls, short_name: str) -> Optional[CommonEntitySuperType]: 1635 sn = short_name.replace(" ", "_").upper() 1636 for member in cls: 1637 if member.value.short_name == sn: 1638 return member.value 1639 return None