Source code for colander_data_converter.base.models

   1import abc
   2import enum
   3from datetime import datetime, UTC
   4from typing import List, Dict, Optional, Union, Annotated, Literal, get_args, Any
   5from uuid import uuid4, UUID
   6
   7from pydantic import (
   8    PositiveInt,
   9    NonNegativeInt,
  10    UUID4,
  11    BaseModel,
  12    AnyUrl,
  13    computed_field,
  14    model_validator,
  15    ConfigDict,
  16    Field,
  17)
  18
  19from colander_data_converter.base.common import (
  20    ObjectReference,
  21    TlpPapLevel,
  22    Singleton,
  23)
  24from colander_data_converter.base.types.actor import ActorType, ActorTypes
  25from colander_data_converter.base.types.artifact import ArtifactType, ArtifactTypes
  26from colander_data_converter.base.types.base import EntityType_T
  27from colander_data_converter.base.types.data_fragment import DataFragmentType, DataFragmentTypes
  28from colander_data_converter.base.types.detection_rule import DetectionRuleType, DetectionRuleTypes
  29from colander_data_converter.base.types.device import DeviceType, DeviceTypes
  30from colander_data_converter.base.types.event import EventType, EventTypes
  31from colander_data_converter.base.types.observable import ObservableType, ObservableTypes
  32from colander_data_converter.base.types.threat import ThreatType, ThreatTypes
  33
  34resource_package = __name__
  35
  36
[docs] 37def get_id(obj: Any) -> Optional[UUID4]: 38 """ 39 Extracts a UUID4 identifier from the given object. 40 41 Args: 42 obj: The object to extract the UUID from. Can be a string, UUID, or an object with an 'id' attribute. 43 44 Returns: 45 Optional[UUID4]: The extracted UUID4 if available, otherwise None. 46 """ 47 if not obj: 48 return None 49 50 if isinstance(obj, str): 51 try: 52 return UUID(obj, version=4) 53 except Exception: 54 return None 55 elif isinstance(obj, UUID): 56 return obj 57 elif (obj_id := getattr(obj, "id", None)) is not None: 58 return get_id(obj_id) 59 60 return None
61 62 63# Annotated union type representing all possible entity definitions in the model. 64# This type is used for fields that can accept any of the defined entity classes. 65# The Field discriminator 'colander_internal_type' is used for type resolution during (de)serialization. 66EntityTypes = Annotated[ 67 Union[ 68 "Actor", 69 "Artifact", 70 "DataFragment", 71 "Observable", 72 "DetectionRule", 73 "Device", 74 "Event", 75 "Threat", 76 ], 77 Field(discriminator="colander_internal_type"), 78] 79 80
[docs] 81class ColanderType(BaseModel): 82 """Base class for all Colander model data_types, providing common functionality. 83 84 This class extends Pydantic's BaseModel and is intended to be subclassed by 85 all model entities. It includes methods for linking and unlinking object references, 86 resolving type hints, and extracting subclass information. 87 """ 88 89 model_config = ConfigDict( 90 str_strip_whitespace=True, 91 arbitrary_types_allowed=True, 92 ) 93
[docs] 94 def model_post_init(self, __context): 95 """Executes post-initialization logic for the model, ensuring the repository 96 registers the current subclass instance. 97 98 Args: 99 __context (Any): Additional context provided for post-initialization handling. 100 """ 101 _ = ColanderRepository() 102 _ << self
103 104 def _process_reference_fields(self, operation, strict=False): 105 """Helper method to process reference fields for both unlinking and resolving operations. 106 107 Args: 108 operation (str): The operation to perform, either 'unlink' or 'resolve'. 109 strict (bool, optional): If True, raises a ValueError when a UUID reference cannot be resolved. 110 Only used for 'resolve' operation. Defaults to False. 111 112 Raises: 113 ValueError: If strict is True and a UUID reference cannot be resolved. 114 AttributeError: If the class instance does not have the expected field or attribute. 115 """ 116 for field, info in self.__class__.model_fields.items(): 117 annotation_args = get_args(info.annotation) 118 if ObjectReference in annotation_args: 119 ref = getattr(self, field) 120 if operation == "unlink" and ref and type(ref) is not UUID: 121 setattr(self, field, ref.id) 122 elif operation == "resolve" and type(ref) is UUID: 123 x = ColanderRepository() >> ref 124 if strict and isinstance(x, UUID): 125 raise ValueError(f"Unable to resolve UUID reference {x}") 126 setattr(self, field, x) 127 elif List[ObjectReference] in annotation_args: 128 refs = getattr(self, field) 129 new_refs = [] 130 _update = False 131 for ref in refs: 132 if operation == "unlink" and ref and type(ref) is not UUID: 133 new_refs.append(ref.id) 134 _update = True 135 elif operation == "resolve" and type(ref) is UUID: 136 x = ColanderRepository() >> ref 137 if strict and isinstance(x, UUID): 138 raise ValueError(f"Unable to resolve UUID reference {x}") 139 new_refs.append(x) 140 _update = True 141 if _update: 142 setattr(self, field, new_refs) 143 163
[docs] 164 def resolve_references(self, strict=False): 165 """Resolves references for the fields in the object's model. 166 167 Fields annotated with `ObjectReference` or `List[ObjectReference]` are processed 168 to fetch and replace their UUID references with respective entities using the `Repository`. 169 170 This method updates the object in-place. 171 172 Args: 173 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 174 If False, unresolved references remain as UUIDs. 175 176 Raises: 177 ValueError: If strict is True and a UUID reference cannot be resolved. 178 """ 179 self._process_reference_fields("resolve", strict)
180
[docs] 181 def is_fully_resolved(self) -> bool: 182 self.resolve_references() 183 184 for field, info in self.__class__.model_fields.items(): 185 annotation_args = get_args(info.annotation) 186 if ObjectReference in annotation_args: 187 ref = getattr(self, field) 188 if isinstance(ref, UUID): 189 return False 190 elif List[ObjectReference] in annotation_args: 191 refs = getattr(self, field) 192 for ref in refs: 193 if isinstance(ref, UUID): 194 return False 195 196 return True
197
[docs] 198 @classmethod 199 def subclasses(cls) -> Dict[str, type["EntityTypes"]]: 200 """Generates a dictionary containing all subclasses of the current class. 201 202 This method collects all the direct subclasses of the current class and maps their 203 names (converted to lowercase) to the class itself. It is primarily useful for 204 organizing and accessing class hierarchies dynamically. 205 206 Returns: 207 Dict[str, type['EntityTypes']]: A dictionary where the keys are the lowercase names of the subclasses, and 208 the values are the subclass data_types themselves. 209 """ 210 subclasses = {} 211 for subclass in cls.__subclasses__(): 212 subclasses[subclass.__name__.lower()] = subclass 213 return subclasses
214
[docs] 215 @classmethod 216 def resolve_type(cls, content_type: str) -> type["EntityTypes"]: 217 """Resolves a specific type of entity definition based on the provided content type by 218 matching it against the available subclasses of the class. This utility ensures that 219 the given content type is valid and matches one of the registered subclasses. 220 221 Args: 222 content_type (str): A string representing the type of content to be resolved. 223 Must match the name of a subclass (in lowercase) of the current class. 224 225 Returns: 226 type['EntityTypes']: The resolved class type corresponding to the provided content type. 227 """ 228 _content_type = content_type.lower() 229 _subclasses = cls.subclasses() 230 assert _content_type in _subclasses 231 return _subclasses[_content_type]
232
[docs] 233 @classmethod 234 def extract_type_hints(cls, obj: dict) -> str: 235 """Extracts type hints from a given dictionary based on specific keys. 236 237 This class method attempts to retrieve type hints from a dictionary using a specific 238 key ("colander_internal_type") or nested keys ("super_type" and its "short_name" value). 239 If the dictionary does not match the expected structure or the keys are not available, 240 a ValueError is raised. 241 242 Args: 243 obj (dict): The dictionary from which type hints need to be extracted. 244 245 Returns: 246 str: A string representing the extracted type hint. 247 248 Raises: 249 ValueError: If the type hint cannot be extracted from the provided dictionary. 250 """ 251 try: 252 if "colander_internal_type" in obj: 253 return obj.get("colander_internal_type", "") 254 elif "super_type" in obj: 255 return obj.get("super_type").get("short_name").lower().replace("_", "") # type: ignore[union-attr] 256 except: # nosec 257 pass 258 raise ValueError("Unable to extract type hints.")
259 260 @computed_field 261 def super_type(self) -> "CommonEntitySuperType": 262 return self.get_super_type() 263
[docs] 264 def get_super_type(self) -> "CommonEntitySuperType": 265 return CommonEntitySuperType( 266 **{ 267 "name": self.__class__.__name__, 268 "short_name": self.__class__.__name__.upper(), 269 "_class": self.__class__, 270 } 271 )
272 273
[docs] 274class Case(ColanderType): 275 """Case represents a collection or grouping of related entities, artifacts, or events. 276 277 This class is used to organize and manage related data, such as incidents, investigations, or projects. 278 279 Example: 280 >>> case = Case( 281 ... name='Investigation Alpha', 282 ... description='Investigation of suspicious activity' 283 ... ) 284 >>> print(case.name) 285 Investigation Alpha 286 """ 287 288 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 289 """The unique identifier for the case.""" 290 291 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 292 """The timestamp when the case was created.""" 293 294 updated_at: datetime = Field(default=datetime.now(UTC)) 295 """The timestamp when the case was last updated.""" 296 297 name: str = Field(..., min_length=1, max_length=512) 298 """The name of the case.""" 299 300 description: str = Field(..., min_length=1) 301 """A description of the case.""" 302 303 documentation: str | None = None 304 """Optional documentation or notes for the case.""" 305 306 pap: TlpPapLevel = TlpPapLevel.WHITE 307 """The PAP (Permissible Actions Protocol) level for the case.""" 308 309 parent_case: Optional["Case"] | Optional[ObjectReference] = None 310 """Reference to a parent case, if this case is a sub-case.""" 311 312 tlp: TlpPapLevel = TlpPapLevel.WHITE 313 """The TLP (Traffic Light Protocol) level for the case.""" 314 315 colander_internal_type: Literal["case"] = "case" 316 """Internal type discriminator for (de)serialization."""
317 318
[docs] 319class Entity(ColanderType, abc.ABC): 320 """Entity is an abstract base class representing a core object in the model. 321 322 This class provides common fields for all entities, including identifiers, timestamps, descriptive fields, 323 and references to cases. Examples include actors, artifacts, devices, etc. 324 """ 325 326 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 327 """The unique identifier for the entity.""" 328 329 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 330 """The timestamp when the entity was created.""" 331 332 updated_at: datetime = Field(default=datetime.now(UTC)) 333 """The timestamp when the entity was last updated.""" 334 335 name: str = Field(..., min_length=1, max_length=512) 336 """The name of the entity.""" 337 338 case: Optional[Case] | Optional[ObjectReference] = None 339 """Reference to the case this entity belongs to.""" 340 341 description: str | None = None 342 """A description of the entity.""" 343 344 pap: TlpPapLevel = TlpPapLevel.WHITE 345 """The PAP (Permissible Actions Protocol) level for the entity.""" 346 347 source_url: str | AnyUrl | None = None 348 """Optional source URL for the entity.""" 349 350 tlp: TlpPapLevel = TlpPapLevel.WHITE 351 """The TLP (Traffic Light Protocol) level for the entity.""" 352
[docs] 353 def get_type(self) -> Optional[EntityType_T]: 354 """ 355 Returns the type definition for this entity instance. 356 357 This method returns the type definition object (e.g., ObservableType, ActorType, DeviceType). 358 359 Returns: 360 Optional[_EntityType]: The type definition object for this entity. The specific type depends 361 on the entity subclass (e.g., Observable returns ObservableType, Actor returns ActorType, etc.). 362 """ 363 if hasattr(self, "type"): 364 return getattr(self, "type") 365 return None
366
[docs] 367 def get_immutable_relations( 368 self, mapping: Optional[Dict[str, str]] = None, default_name: Optional[str] = None 369 ) -> Dict[str, "EntityRelation"]: 370 """ 371 Returns a dictionary of immutable relations derived from the entity's reference fields. 372 373 This method automatically creates EntityRelation objects by inspecting the entity's fields 374 and identifying those annotated as ObjectReference or List[ObjectReference]. These represent 375 the entity's connections to other entities in the knowledge graph, forming the basis for 376 graph traversal and relationship analysis. 377 378 Immutable relations are derived from the entity's structure and cannot be modified directly. 379 They represent inherent relationships defined by the entity's reference fields, such as 380 'extracted_from', 'operated_by', 'associated_threat', etc. 381 382 Args: 383 mapping (Dict[str, str], optional): A dictionary to customize relation names. Keys should 384 be field names, and values should be the desired relation names. If not provided, 385 field names are converted to human-readable format by replacing underscores with spaces. 386 Defaults to None. 387 default_name (str): If a mapping is provided but no field mapping was found, the relation 388 will be named 'default_new_name'. 389 390 Returns: 391 Dict[str, "EntityRelation"]: A dictionary of EntityRelation objects keyed by their string 392 representation of relation IDs. Each relation represents a connection from this entity 393 to another entity referenced in its fields. 394 395 Note: 396 - The 'case' field is explicitly excluded from relation generation as it represents 397 a grouping mechanism rather than a semantic relationship. 398 - Only fields with actual values (not None or empty) are processed. 399 - Each EntityRelation created has this entity as the source (obj_from) and the 400 referenced entity as the target (obj_to). 401 """ 402 name_mapping = mapping or {} 403 relations: Dict[str, "EntityRelation"] = {} 404 for field_name, field_info in self.__class__.model_fields.items(): 405 if field_name == "case": 406 continue 407 field_annotation = get_args(field_info.annotation) 408 field_value = getattr(self, field_name, None) 409 410 if not field_value or not field_annotation: 411 continue 412 413 # Handle single ObjectReference 414 if ObjectReference in field_annotation: 415 relation_name = name_mapping.get(field_name, default_name or field_name) 416 relation = EntityRelation( 417 name=relation_name, 418 obj_from=self, 419 obj_to=field_value, 420 ) 421 relations[str(relation.id)] = relation 422 423 # Handle List[ObjectReference] 424 elif List[ObjectReference] in field_annotation: 425 for object_reference in field_value: 426 relation_name = name_mapping.get(field_name, default_name or field_name) 427 relation = EntityRelation( 428 name=relation_name, 429 obj_from=self, 430 obj_to=object_reference, 431 ) 432 relations[str(relation.id)] = relation 433 434 return relations
435 436
[docs] 437class EntityRelation(ColanderType): 438 """EntityRelation represents a relationship between two entities in the model. 439 440 This class is used to define and manage relationships between objects, such as associations 441 between observables, devices, or actors. 442 443 Example: 444 >>> obs1 = Observable( 445 ... id=uuid4(), 446 ... name='1.1.1.1', 447 ... type=ObservableTypes.IPV4.value 448 ... ) 449 >>> obs2 = Observable( 450 ... id=uuid4(), 451 ... name='8.8.8.8', 452 ... type=ObservableTypes.IPV4.value 453 ... ) 454 >>> relation = EntityRelation( 455 ... id=uuid4(), 456 ... name='connection', 457 ... obj_from=obs1, 458 ... obj_to=obs2 459 ... ) 460 >>> print(relation.name) 461 connection 462 """ 463 464 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 465 """The unique identifier for the entity relation.""" 466 467 created_at: datetime = Field(default=datetime.now(UTC), frozen=True) 468 """The timestamp when the entity relation was created.""" 469 470 updated_at: datetime = Field(default=datetime.now(UTC)) 471 """The timestamp when the entity relation was last updated.""" 472 473 name: str = Field(..., min_length=1, max_length=512) 474 """The name of the entity relation.""" 475 476 case: Optional[Case] | Optional[ObjectReference] = None 477 """Reference to the case this relation belongs to.""" 478 479 attributes: Optional[Dict[str, str]] = None 480 """Dictionary of additional attributes for the relation.""" 481 482 obj_from: EntityTypes | ObjectReference = Field(...) 483 """The source entity or reference in the relation.""" 484 485 obj_to: EntityTypes | ObjectReference = Field(...) 486 """The target entity or reference in the relation."""
487 488
[docs] 489class Actor(Entity): 490 """ 491 Actor represents an individual or group involved in an event, activity, or system. 492 493 This class extends the Entity base class and includes additional fields specific to actors. 494 495 Example: 496 >>> actor_type = ActorTypes.INDIVIDUAL.value 497 >>> actor = Actor( 498 ... name='John Doe', 499 ... type=actor_type 500 ... ) 501 >>> print(actor.name) 502 John Doe 503 """ 504 505 type: ActorType 506 """The type definition for the actor.""" 507 508 colander_internal_type: Literal["actor"] = "actor" 509 """Internal type discriminator for (de)serialization.""" 510 511 attributes: Optional[Dict[str, str]] = None 512 """Dictionary of additional attributes for the device."""
513 514
[docs] 515class Device(Entity): 516 """ 517 Device represents a physical or virtual device in Colander. 518 519 This class extends the Entity base class and includes additional fields specific to devices, 520 such as their type, attributes, and the actor operating the device. 521 522 Example: 523 >>> device_type = DeviceTypes.MOBILE.value 524 >>> actor = Actor(name='John Doe', type=ActorTypes.INDIVIDUAL.value) 525 >>> device = Device( 526 ... name="John's Phone", 527 ... type=device_type, 528 ... operated_by=actor, 529 ... attributes={'os': 'Android', 'version': '12'} 530 ... ) 531 >>> print(device.name) 532 John's Phone 533 """ 534 535 type: DeviceType 536 """The type definition for the device.""" 537 538 attributes: Optional[Dict[str, str]] = None 539 """Dictionary of additional attributes for the device.""" 540 541 operated_by: Optional[Actor] | Optional[ObjectReference] = None 542 """Reference to the actor operating the device.""" 543 544 colander_internal_type: Literal["device"] = "device" 545 """Internal type discriminator for (de)serialization."""
546 547
[docs] 548class Artifact(Entity): 549 """ 550 Artifact represents a file or data object, such as a document, image, or binary, within the system. 551 552 This class extends the Entity base class and includes additional fields specific to artifacts, 553 such as type, attributes, extraction source, file metadata, and cryptographic hashes. 554 555 Example: 556 >>> artifact_type = ArtifactTypes.DOCUMENT.value 557 >>> device_type = DeviceTypes.LAPTOP.value 558 >>> device = Device(name='Analyst Laptop', type=device_type) 559 >>> artifact = Artifact( 560 ... name='malware_sample.pdf', 561 ... type=artifact_type, 562 ... extracted_from=device, 563 ... extension='pdf', 564 ... original_name='invoice.pdf', 565 ... mime_type='application/pdf', 566 ... md5='d41d8cd98f00b204e9800998ecf8427e', 567 ... sha1='da39a3ee5e6b4b0d3255bfef95601890afd80709', 568 ... sha256='e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', 569 ... size_in_bytes=12345 570 ... ) 571 >>> print(artifact.name) 572 malware_sample.pdf 573 """ 574 575 type: ArtifactType 576 """The type definition for the artifact.""" 577 578 attributes: Optional[Dict[str, str]] = None 579 """Dictionary of additional attributes for the artifact.""" 580 581 extracted_from: Optional[Device] | Optional[ObjectReference] = None 582 """Reference to the device from which this artifact was extracted.""" 583 584 extension: str | None = None 585 """The file extension of the artifact, if applicable.""" 586 587 original_name: str | None = None 588 """The original name of the artifact before ingestion.""" 589 590 mime_type: str | None = None 591 """The MIME type of the artifact.""" 592 593 detached_signature: str | None = None 594 """Optional detached signature for the artifact.""" 595 596 md5: str | None = None 597 """MD5 hash of the artifact.""" 598 599 sha1: str | None = None 600 """SHA1 hash of the artifact.""" 601 602 sha256: str | None = None 603 """SHA256 hash of the artifact.""" 604 605 size_in_bytes: NonNegativeInt = 0 606 """The size of the artifact in bytes.""" 607 608 colander_internal_type: Literal["artifact"] = "artifact" 609 """Internal type discriminator for (de)serialization."""
610 611
[docs] 612class DataFragment(Entity): 613 """ 614 DataFragment represents a fragment of data, such as a code snippet, text, or other content. 615 616 This class extends the Entity base class and includes additional fields specific to data fragments, 617 such as their type, content, and the artifact from which they were extracted. 618 619 Example: 620 >>> data_fragment_type = DataFragmentTypes.CODE.value 621 >>> artifact = Artifact( 622 ... name='example_artifact', 623 ... type=ArtifactTypes.DOCUMENT.value 624 ... ) 625 >>> data_fragment = DataFragment( 626 ... name='Sample Code', 627 ... type=data_fragment_type, 628 ... content='print("Hello, World!")', 629 ... extracted_from=artifact 630 ... ) 631 >>> print(data_fragment.content) 632 print("Hello, World!") 633 """ 634 635 type: DataFragmentType 636 """The type definition for the data fragment.""" 637 638 content: str 639 """The content of the data fragment.""" 640 641 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None 642 """Reference to the artifact from which this data fragment was extracted.""" 643 644 colander_internal_type: Literal["datafragment"] = "datafragment" 645 """Internal type discriminator for (de)serialization."""
646 647
[docs] 648class Threat(Entity): 649 """ 650 Threat represents a threat entity, such as a malware family, campaign, or adversary. 651 652 This class extends the Entity base class and includes a type field for threat classification. 653 654 Example: 655 >>> threat_type = ThreatTypes.TROJAN.value 656 >>> threat = Threat( 657 ... name='Emotet', 658 ... type=threat_type 659 ... ) 660 >>> print(threat.name) 661 Emotet 662 """ 663 664 type: ThreatType 665 """The type definition for the threat.""" 666 667 colander_internal_type: Literal["threat"] = "threat" 668 """Internal type discriminator for (de)serialization."""
669 670
[docs] 671class Observable(Entity): 672 """ 673 Observable represents an entity that can be observed or detected within the system. 674 675 This class extends the Entity base class and includes additional fields specific to observables, 676 such as classification, raw value, extraction source, associated threat, and operator. 677 678 Example: 679 >>> ot = ObservableTypes.IPV4.value 680 >>> obs = Observable( 681 ... name='1.2.3.4', 682 ... type=ot, 683 ... classification='malicious', 684 ... raw_value='1.2.3.4', 685 ... attributes={'asn': 'AS123'} 686 ... ) 687 >>> print(obs.name) 688 1.2.3.4 689 """ 690 691 type: ObservableType = Field(...) 692 """The type definition for the observable.""" 693 694 attributes: Optional[Dict[str, str]] = None 695 """Dictionary of additional attributes for the observable.""" 696 697 classification: str | None = Field(default=None, max_length=512) 698 """Optional classification label for the observable.""" 699 700 raw_value: str | None = None 701 """The raw value associated with the observable.""" 702 703 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None 704 """Reference to the artifact from which this observable was extracted.""" 705 706 associated_threat: Optional[Threat] | Optional[ObjectReference] = None 707 """Reference to an associated threat.""" 708 709 operated_by: Optional[Actor] | Optional[ObjectReference] = None 710 """Reference to the actor operating this observable.""" 711 712 colander_internal_type: Literal["observable"] = "observable" 713 """Internal type discriminator for (de)serialization."""
714 715
[docs] 716class DetectionRule(Entity): 717 """ 718 DetectionRule represents a rule used for detecting specific content or logic related to observables or 719 object references. 720 721 This class is designed to encapsulate detection rules that can be applied across various systems or platforms to 722 identify patterns or conditions defined by the user. 723 724 Example: 725 >>> drt = DetectionRuleTypes.YARA.value 726 >>> rule = DetectionRule( 727 ... name='Detect Malicious IP', 728 ... type=drt, 729 ... content='rule malicious_ip { condition: true }', 730 ... ) 731 >>> print(rule.name) 732 Detect Malicious IP 733 """ 734 735 type: DetectionRuleType 736 """The type definition for the detection rule.""" 737 738 content: str 739 """The content or logic of the detection rule.""" 740 741 targeted_observables: Optional[List[Observable]] | Optional[List[ObjectReference]] = None 742 """List of observables or references targeted by this detection rule.""" 743 744 colander_internal_type: Literal["detectionrule"] = "detectionrule" 745 """Internal type discriminator for (de)serialization."""
746 747
[docs] 748class Event(Entity): 749 """ 750 Event represents an occurrence or activity observed within a system, such as a detection, alert, or log entry. 751 752 This class extends the Entity base class and includes additional fields specific to events, 753 such as timestamps, count, involved observables, and references to related entities. 754 755 Example: 756 >>> et = EventTypes.HIT.value 757 >>> obs_type = ObservableTypes.IPV4.value 758 >>> obs = Observable( 759 ... id=uuid4(), 760 ... name='8.8.8.8', 761 ... type=obs_type 762 ... ) 763 >>> event = Event( 764 ... name='Suspicious Connection', 765 ... type=et, 766 ... first_seen=datetime(2024, 6, 1, 12, 0, tzinfo=UTC), 767 ... last_seen=datetime(2024, 6, 1, 12, 5, tzinfo=UTC), 768 ... involved_observables=[obs] 769 ... ) 770 >>> print(event.name) 771 Suspicious Connection 772 """ 773 774 type: EventType 775 """The type definition for the event.""" 776 777 attributes: Optional[Dict[str, str]] = None 778 """Dictionary of additional attributes for the event.""" 779 780 first_seen: datetime = datetime.now(UTC) 781 """The timestamp when the event was first observed.""" 782 783 last_seen: datetime = datetime.now(UTC) 784 """The timestamp when the event was last observed.""" 785 786 count: PositiveInt = 1 787 """The number of times this event was observed.""" 788 789 extracted_from: Optional[Artifact] | Optional[ObjectReference] = None 790 """Reference to the artifact from which this event was extracted.""" 791 792 observed_on: Optional[Device] | Optional[ObjectReference] = None 793 """Reference to the device on which this event was observed.""" 794 795 detected_by: Optional[DetectionRule] | Optional[ObjectReference] = None 796 """Reference to the detection rule that detected this event.""" 797 798 # ToDo: missing attribute in Colander implementation 799 attributed_to: Optional[Actor] | Optional[ObjectReference] = None 800 """Reference to the actor attributed to this event.""" 801 802 # ToDo: missing attribute in Colander implementation 803 target: Optional[Actor] | Optional[ObjectReference] = None 804 """Reference to the actor targeted during this event.""" 805 806 involved_observables: List[Observable] | List[ObjectReference] = [] 807 """List of observables or references involved in this event.""" 808 809 colander_internal_type: Literal["event"] = "event" 810 """Internal type discriminator for (de)serialization.""" 811 812 @model_validator(mode="after") 813 def _check_dates(self) -> Any: 814 if self.first_seen > self.last_seen: 815 raise ValueError("first_seen must be before last_seen") 816 return self
817 818
[docs] 819class ColanderRepository(object, metaclass=Singleton): 820 """Singleton repository for managing and storing Case, Entity, and EntityRelation objects. 821 822 This class provides centralized storage and reference management for all model instances, 823 supporting insertion, lookup, and reference resolution/unlinking. 824 """ 825 826 cases: Dict[str, Case] 827 entities: Dict[str, EntityTypes] 828 relations: Dict[str, EntityRelation] 829
[docs] 830 def __init__(self): 831 """Initializes the repository with empty dictionaries for cases, entities, and relations.""" 832 self.cases = {} 833 self.entities = {} 834 self.relations = {}
835
[docs] 836 def clear(self): 837 self.cases.clear() 838 self.entities.clear() 839 self.relations.clear()
840
[docs] 841 def __lshift__(self, other: EntityTypes | Case) -> None: 842 """Inserts an object into the appropriate repository dictionary. 843 844 Args: 845 other: The object (Entity, EntityRelation, or Case) to insert. 846 """ 847 if isinstance(other, Entity): 848 self.entities[str(other.id)] = other 849 elif isinstance(other, EntityRelation): 850 self.relations[str(other.id)] = other 851 elif isinstance(other, Case): 852 self.cases[str(other.id)] = other
853
[docs] 854 def __rshift__(self, other: str | UUID4) -> EntityTypes | EntityRelation | Case | str | UUID4: 855 """Retrieves an object by its identifier from entities, relations, or cases. 856 857 Args: 858 other: The string or UUID identifier to look up. 859 860 Returns: 861 The found object or the identifier if not found. 862 """ 863 _other = str(other) 864 if _other in self.entities: 865 return self.entities[_other] 866 elif _other in self.relations: 867 return self.relations[_other] 868 elif _other in self.cases: 869 return self.cases[_other] 870 return other
871 880
[docs] 881 def resolve_references(self): 882 """Resolves all UUID references in entities, relations, and cases to their corresponding objects.""" 883 for _, entity in self.entities.items(): 884 entity.resolve_references() 885 for _, relation in self.relations.items(): 886 relation.resolve_references() 887 for _, case in self.cases.items(): 888 case.resolve_references()
889 890
[docs] 891class ColanderFeed(ColanderType): 892 """ColanderFeed aggregates entities, relations, and cases for bulk operations or data exchange. 893 894 This class is used to load, manage, and resolve references for collections of model objects. 895 896 Example: 897 >>> feed_data = { 898 ... "entities": { 899 ... "204d4590-a3ee-4f24-8eaf-350ec2fa751b": { 900 ... "id": "204d4590-a3ee-4f24-8eaf-350ec2fa751b", 901 ... "name": "Example Observable", 902 ... "type": {"name": "IPv4", "short_name": "IPV4"}, 903 ... "super_type": {"short_name": "observable"}, 904 ... "colander_internal_type": "observable" 905 ... } 906 ... }, 907 ... "relations": {}, 908 ... "cases": {} 909 ... } 910 >>> feed = ColanderFeed.load(feed_data) 911 >>> print(list(feed.entities.keys())) 912 ['204d4590-a3ee-4f24-8eaf-350ec2fa751b'] 913 """ 914 915 id: UUID4 = Field(frozen=True, default_factory=lambda: uuid4()) 916 """The unique identifier for the feed.""" 917 918 name: str = "" 919 """Optional name of the feed.""" 920 921 description: str = "" 922 """Optional description of the feed.""" 923 924 entities: Optional[Dict[str, EntityTypes]] = {} 925 """Dictionary of entity objects, keyed by their IDs.""" 926 927 relations: Optional[Dict[str, EntityRelation]] = {} 928 """Dictionary of entity relations, keyed by their IDs.""" 929 930 cases: Optional[Dict[str, Case]] = {} 931 """Dictionary of case objects, keyed by their IDs.""" 932
[docs] 933 @staticmethod 934 def load(raw_object: dict | list) -> "ColanderFeed": 935 """Loads an EntityFeed from a raw object, which can be either a dictionary or a list. 936 937 Args: 938 raw_object: The raw data representing the entities and relations to be loaded into 939 the EntityFeed. 940 941 Returns: 942 The EntityFeed loaded from a raw object. 943 944 Raises: 945 ValueError: If there are inconsistencies in entity IDs or relations. 946 """ 947 if "entities" in raw_object: 948 for entity_id, entity in raw_object["entities"].items(): 949 if entity_id != entity.get("id"): 950 raise ValueError(f"Relation {entity_id} does not match with the ID of {entity}") 951 entity["colander_internal_type"] = entity["super_type"]["short_name"].lower() 952 if "relations" in raw_object: 953 for relation_id, relation in raw_object["relations"].items(): 954 if relation_id != relation.get("id"): 955 raise ValueError(f"Relation {relation_id} does not match with the ID of {relation}") 956 if ( 957 "obj_from" not in relation 958 and "obj_to" not in relation 959 and "obj_from_id" in relation 960 and "obj_to_id" in relation 961 ): 962 relation["obj_from"] = relation["obj_from_id"] 963 relation["obj_to"] = relation["obj_to_id"] 964 entity_feed = ColanderFeed.model_validate(raw_object) 965 entity_feed.resolve_references() 966 for _, entity in entity_feed.entities.items(): 967 entity.resolve_references() 968 for _, relation in entity_feed.relations.items(): 969 relation.resolve_references() 970 for _, case in entity_feed.cases.items(): 971 case.resolve_references() 972 return entity_feed
973
[docs] 974 def resolve_references(self, strict=False): 975 """Resolves references within entities, relations, and cases. 976 977 Iterates over each entity, relation, and case within the respective collections, calling their 978 `resolve_references` method to update them with any referenced data. This helps in synchronizing 979 internal state with external dependencies or updates. 980 981 Args: 982 strict: If True, raises a ValueError when a UUID reference cannot be resolved. 983 If False, unresolved references remain as UUIDs. 984 """ 985 for _, entity in self.entities.items(): 986 entity.resolve_references(strict=strict) 987 for _, relation in self.relations.items(): 988 relation.resolve_references(strict=strict) 989 for _, case in self.cases.items(): 990 case.resolve_references(strict=strict)
991 1006
[docs] 1007 def contains(self, obj: Any) -> bool: 1008 """Check if an object exists in the current feed by its identifier. 1009 1010 This method determines whether a given object (or its identifier) exists 1011 within any of the feed's collections: entities, relations, or cases. 1012 It extracts the object's ID and searches across all three collections. 1013 1014 Args: 1015 obj (Any): The object to check for existence. Can be: 1016 - An entity, relation, or case object with an 'id' attribute 1017 - A string or UUID representing an object ID 1018 - Any object that can be processed by get_id() 1019 1020 Returns: 1021 bool: True if the object exists in entities, relations, or cases; 1022 False otherwise 1023 1024 Example: 1025 >>> feed = ColanderFeed() 1026 >>> obs = Observable(name="test", type=ObservableTypes.IPV4.value) 1027 >>> feed.entities[str(obs.id)] = obs 1028 >>> feed.contains(obs) 1029 True 1030 >>> feed.contains("nonexistent-id") 1031 False 1032 """ 1033 object_id = str(get_id(obj)) 1034 if not object_id: 1035 return False 1036 1037 if object_id in self.entities: 1038 return True 1039 if object_id in self.relations: 1040 return True 1041 if object_id in self.cases: 1042 return True 1043 1044 return False
1045
[docs] 1046 def get(self, obj: Any) -> Optional[Union[Case, EntityTypes, EntityRelation]]: 1047 """Retrieve an object from the feed by its identifier. 1048 1049 This method searches for an object across all feed collections (entities, relations, cases) 1050 using the object's ID. It first checks if the object exists using the contains() method, 1051 then attempts to retrieve it from the appropriate collection. 1052 1053 Args: 1054 obj (Any): The object to retrieve. Can be: 1055 - An entity, relation, or case object with an 'id' attribute 1056 - A string or UUID representing an object ID 1057 - Any object that can be processed by get_id() 1058 1059 Returns: 1060 Optional[Union[Case, EntityTypes, EntityRelation]]: The found object if it exists 1061 in any of the collections (entities, relations, or cases), otherwise None. 1062 """ 1063 if not self.contains(obj): 1064 return None 1065 1066 object_id = str(get_id(obj)) 1067 1068 if object_id in self.entities: 1069 return self.entities.get(object_id) 1070 if object_id in self.relations: 1071 return self.relations.get(object_id) 1072 if object_id in self.cases: 1073 return self.cases.get(object_id) 1074 1075 return None
1076
[docs] 1077 def get_by_super_type(self, super_type: "CommonEntitySuperType") -> List[EntityTypes]: 1078 entities = [] 1079 for _, entity in self.entities.items(): 1080 if isinstance(entity, super_type.model_class): 1081 entities.append(entity) 1082 return entities
1083
[docs] 1084 def get_incoming_relations(self, entity: EntityTypes) -> Dict[str, EntityRelation]: 1085 """Retrieve all relations where the specified entity is the target (obj_to). 1086 1087 This method finds all entity relations in the feed where the given entity 1088 is the destination or target of the relationship. Only fully resolved 1089 relations are considered to ensure data consistency. 1090 1091 Args: 1092 entity (EntityTypes): The entity to find incoming relations for. Must be an instance of Entity. 1093 1094 Returns: 1095 Dict[str, EntityRelation]: A dictionary mapping relation IDs to EntityRelation objects where the entity 1096 is the target (obj_to). 1097 """ 1098 assert isinstance(entity, Entity) 1099 relations = {} 1100 for relation_id, relation in self.relations.items(): 1101 if not relation.is_fully_resolved(): 1102 continue 1103 if relation.obj_to == entity: 1104 relations[relation_id] = relation 1105 return relations
1106
[docs] 1107 def get_outgoing_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]: 1108 """Retrieve all relations where the specified entity is the source (obj_from). 1109 1110 This method finds all entity relations in the feed where the given entity 1111 is the source or origin of the relationship. Only fully resolved 1112 relations are considered to ensure data consistency. 1113 1114 Args: 1115 entity (EntityTypes): The entity to find outgoing relations for. Must be an instance of Entity. 1116 exclude_immutables (bool): If True, exclude immutable relations. 1117 1118 Returns: 1119 Dict[str, EntityRelation]: A dictionary mapping relation IDs to EntityRelation objects where the entity 1120 is the source (obj_from). 1121 """ 1122 assert isinstance(entity, Entity) 1123 relations = {} 1124 if not exclude_immutables: 1125 for _, entity in self.entities.items(): 1126 relations.update(entity.get_immutable_relations()) 1127 for relation_id, relation in self.relations.items(): 1128 if not relation.is_fully_resolved(): 1129 continue 1130 if relation.obj_from == entity: 1131 relations[relation_id] = relation 1132 return relations
1133
[docs] 1134 def get_relations(self, entity: EntityTypes, exclude_immutables=True) -> Dict[str, EntityRelation]: 1135 """Retrieve all relations (both incoming and outgoing) for the specified entity. 1136 1137 This method combines the results of get_incoming_relations() and 1138 get_outgoing_relations() to provide a complete view of all relationships 1139 involving the specified entity, regardless of direction. 1140 1141 Args: 1142 entity (EntityTypes): The entity to find all relations for. Must be an instance of Entity. 1143 exclude_immutables (bool): If True, exclude immutable relations. 1144 1145 Returns: 1146 Dict[str, EntityRelation]: A dictionary mapping relation IDs to EntityRelation objects where the entity 1147 is either the source (obj_from) or target (obj_to). 1148 """ 1149 assert isinstance(entity, Entity) 1150 1151 relations = {} 1152 relations.update(self.get_incoming_relations(entity)) 1153 relations.update(self.get_outgoing_relations(entity, exclude_immutables=exclude_immutables)) 1154 1155 return relations
1156
[docs] 1157 def filter( 1158 self, 1159 maximum_tlp_level: TlpPapLevel, 1160 include_relations=True, 1161 include_cases=True, 1162 exclude_entity_types: Optional[List[EntityTypes]] = None, 1163 ) -> "ColanderFeed": 1164 """Filter the feed based on TLP (Traffic Light Protocol) level and optionally include relations and cases. 1165 1166 This method creates a new ColanderFeed containing only entities whose TLP level is below 1167 the specified maximum threshold. It can optionally include relations between filtered 1168 entities and cases associated with the filtered entities. 1169 1170 Args: 1171 maximum_tlp_level (TlpPapLevel): The maximum TLP level threshold. Only entities 1172 with TLP levels strictly below this value will be included. 1173 include_relations (bool, optional): If True, includes relations where both 1174 source and target entities are present in the filtered feed. Defaults to True. 1175 include_cases (bool, optional): If True, includes cases associated with the 1176 filtered entities. Defaults to True. 1177 exclude_entity_types (Optional[List[EntityTypes]], optional): If provided, entities of these types 1178 are excluded. 1179 1180 Returns: 1181 ColanderFeed: A new filtered feed containing entities, relations, and cases that meet the 1182 specified criteria. 1183 """ 1184 assert isinstance(maximum_tlp_level, TlpPapLevel) 1185 1186 excluded_types = exclude_entity_types or [] 1187 1188 self.resolve_references() 1189 filtered = ColanderFeed( 1190 name=self.name, 1191 description=self.description, 1192 ) 1193 1194 for entity_id, entity in self.entities.items(): 1195 if entity.tlp.value < maximum_tlp_level.value and type(entity) not in excluded_types: 1196 filtered.entities[entity_id] = entity 1197 1198 for entity_id, entity in filtered.entities.items(): 1199 # Only include relations of the entity 1200 if include_relations: 1201 for relation_id, relation in self.get_relations(entity).items(): 1202 if filtered.contains(relation.obj_from) and filtered.contains(relation.obj_to): 1203 filtered.relations[relation_id] = relation 1204 # Only include the case associated with the entity 1205 if include_cases: 1206 if (case := self.get(entity.case)) is not None and case.tlp.value < maximum_tlp_level.value: 1207 filtered.cases[str(case.id)] = case 1208 1209 filtered.resolve_references() 1210 return filtered
1211 1212
[docs] 1213class CommonEntitySuperType(BaseModel): 1214 """ 1215 CommonEntitySuperType defines metadata for a super type of entities in the Colander data model. 1216 1217 This class is used to represent high-level categories of entities (such as Actor, Artifact, Device, etc.) 1218 and provides fields for the short name, display name, associated types, and the Python class implementing the entity. 1219 """ 1220 1221 short_name: str = Field(frozen=True, max_length=32) 1222 """A short name for the model type.""" 1223 1224 name: str = Field(frozen=True, max_length=512) 1225 """The name of the model type.""" 1226 1227 types: Optional[List[object]] = Field(default=None, exclude=True) 1228 """Optional reference to the enum or collection of supported types.""" 1229 1230 model_class: Any = Field(default=None, exclude=True) 1231 """The Python class associated with this super type (Observable...).""" 1232 1233 type_class: Any = Field(default=None, exclude=True) 1234 """The Python class associated with the entity type (ObservableType...).""" 1235 1236 default_type: Any = Field(default=None, exclude=True) 1237 """The default entity type (GENERIC...).""" 1238
[docs] 1239 def type_by_short_name(self, short_name: str): 1240 for t in self.types: 1241 if hasattr(t, short_name.upper()): 1242 return getattr(t, short_name.upper()).value 1243 return self.default_type.value
1244 1245 def __str__(self): 1246 return self.short_name 1247 1248 def __repr__(self): 1249 return self.short_name
1250 1251
[docs] 1252class CommonEntitySuperTypes(enum.Enum): 1253 """ 1254 CommonEntitySuperTypes is an enumeration of all super types for entities in the Colander data model. 1255 1256 Each member of this enum represents a high-level entity category (such as Actor, Artifact, Device, etc.) 1257 and holds a CommonEntitySuperType instance containing metadata and references to the corresponding 1258 entity class and its supported types. 1259 1260 This enum is used for type resolution and validation across the model. 1261 1262 Example: 1263 >>> super_type = CommonEntitySuperTypes.ACTOR.value 1264 >>> print(super_type.name) 1265 Actor 1266 """ 1267 1268 ACTOR = CommonEntitySuperType( 1269 short_name="ACTOR", 1270 name="Actor", 1271 model_class=Actor, 1272 type_class=ActorType, 1273 default_type=ActorTypes.default, 1274 types=[t for t in ActorTypes], 1275 ) 1276 ARTIFACT = CommonEntitySuperType( 1277 short_name="ARTIFACT", 1278 name="Artifact", 1279 model_class=Artifact, 1280 type_class=ArtifactType, 1281 default_type=ArtifactTypes.default, 1282 types=[t for t in ArtifactTypes], 1283 ) 1284 DATA_FRAGMENT = CommonEntitySuperType( 1285 short_name="DATAFRAGMENT", 1286 name="Data fragment", 1287 model_class=DataFragment, 1288 type_class=DataFragmentType, 1289 default_type=DataFragmentTypes.default, 1290 types=[t for t in DataFragmentTypes], 1291 ) 1292 DETECTION_RULE = CommonEntitySuperType( 1293 short_name="DETECTIONRULE", 1294 name="Detection rule", 1295 model_class=DetectionRule, 1296 type_class=DetectionRuleType, 1297 default_type=DetectionRuleTypes.default, 1298 types=[t for t in DetectionRuleTypes], 1299 ) 1300 DEVICE = CommonEntitySuperType( 1301 short_name="DEVICE", 1302 name="Device", 1303 model_class=Device, 1304 type_class=DeviceType, 1305 default_type=DeviceTypes.default, 1306 types=[t for t in DeviceTypes], 1307 ) 1308 EVENT = CommonEntitySuperType( 1309 short_name="EVENT", 1310 name="Event", 1311 model_class=Event, 1312 type_class=EventType, 1313 default_type=EventTypes.default, 1314 types=[t for t in EventTypes], 1315 ) 1316 OBSERVABLE = CommonEntitySuperType( 1317 short_name="OBSERVABLE", 1318 name="Observable", 1319 model_class=Observable, 1320 type_class=ObservableType, 1321 default_type=ObservableTypes.default, 1322 types=[t for t in ObservableTypes], 1323 ) 1324 THREAT = CommonEntitySuperType( 1325 short_name="THREAT", 1326 name="Threat", 1327 model_class=Threat, 1328 type_class=ThreatType, 1329 default_type=ThreatTypes.default, 1330 types=[t for t in ThreatTypes], 1331 ) 1332
[docs] 1333 @classmethod 1334 def by_short_name(cls, short_name: str) -> Optional[CommonEntitySuperType]: 1335 sn = short_name.replace(" ", "_").upper() 1336 if sn in cls.__members__: 1337 return cls[sn].value 1338 return None