1from datetime import datetime, UTC
2from typing import Dict, Optional, TYPE_CHECKING, Literal, List, TypeVar, Annotated, Union, Generator, Type
3from uuid import uuid4
4
5from pydantic import BaseModel, Field, ConfigDict
6
7from colander_data_converter.base.common import Singleton
8
9# Avoid circular imports
10if TYPE_CHECKING:
11 pass
12
13
[docs]
14class Stix2Repository(object, metaclass=Singleton):
15 """
16 Singleton repository for managing and storing STIX2 objects.
17
18 This class provides centralized storage and reference management for all STIX2 objects,
19 supporting conversion to and from Colander data.
20 """
21
22 stix2_objects: Dict[str, "Stix2ObjectTypes"]
23
[docs]
24 def __init__(self):
25 """
26 Initializes the repository with an empty dictionary for STIX2 objects.
27 """
28 self.stix2_objects = {}
29
[docs]
30 def __lshift__(self, stix2_object: "Stix2ObjectTypes") -> None:
31 """
32 Adds a STIX2 object to the repository.
33
34 Args:
35 stix2_object (Dict[str, Any]): The STIX2 object to add.
36 """
37 self.stix2_objects[stix2_object.id] = stix2_object
38
[docs]
39 def __rshift__(self, object_id: str) -> Optional["Stix2ObjectTypes"]:
40 """
41 Retrieves a STIX2 object from the repository by its ID.
42
43 Args:
44 object_id (str): The ID of the STIX2 object to retrieve.
45
46 Returns:
47 Optional[Dict[str, Any]]: The STIX2 object if found, None otherwise.
48 """
49 return self.stix2_objects.get(object_id)
50
[docs]
51 def clear(self) -> None:
52 """
53 Clears all STIX2 objects from the repository.
54 """
55 self.stix2_objects.clear()
56
57
58Stix2Object_T = TypeVar("Stix2Object_T", bound="Stix2ObjectBase")
59Stix2ObjectTypes = Annotated[
60 Union[
61 "File",
62 "Indicator",
63 "Infrastructure",
64 "Identity",
65 "Malware",
66 "ThreatActor",
67 "Relationship",
68 ],
69 Field(discriminator="type"),
70]
71
72
[docs]
73class Stix2ObjectBase(BaseModel):
74 type: str = Field(..., frozen=True)
75 created: str = Field(frozen=True, default=datetime.now(UTC).isoformat())
76 modified: str = Field(default=datetime.now(UTC).isoformat())
77 description: str | None = None
78
79 model_config = ConfigDict(
80 str_strip_whitespace=True,
81 arbitrary_types_allowed=True,
82 extra="allow",
83 )
84
[docs]
85 def model_post_init(self, __context):
86 """Executes post-initialization logic for the model, ensuring the repository
87 registers the current subclass instance.
88
89 Args:
90 __context (Any): Additional context provided for post-initialization handling.
91 """
92 _ = Stix2Repository()
93 _ << self
94
[docs]
95 @classmethod
96 def subclasses(cls) -> Dict[str, Type["Stix2ObjectBase"]]:
97 subclasses: Dict[str, Type["Stix2ObjectBase"]] = {}
98 for subclass in cls.__subclasses__():
99 subclasses[subclass.__name__.lower()] = subclass
100 return subclasses
101
[docs]
102 @classmethod
103 def get_supported_types(cls) -> List[str]:
104 types = []
105 for _, subclass in cls.subclasses().items():
106 types.append(subclass.model_fields["type"].default)
107 return types
108
[docs]
109 @classmethod
110 def get_model_class(cls, type_name: str) -> Optional[Type["Stix2ObjectBase"]]:
111 for _, subclass in cls.subclasses().items():
112 if type_name == subclass.model_fields["type"].default:
113 return subclass
114 return None
115
116
[docs]
117class File(Stix2ObjectBase):
118 id: str = Field(frozen=True, default_factory=lambda: f"file--{uuid4()}")
119 type: Literal["file"] = "file"
120 name: str = Field(...)
121
122
[docs]
123class Indicator(Stix2ObjectBase):
124 id: str = Field(frozen=True, default_factory=lambda: f"indicator--{uuid4()}")
125 type: Literal["indicator"] = "indicator"
126 name: str = Field(...)
127 pattern: str = ""
128 pattern_type: str = "stix"
129
130
[docs]
131class Infrastructure(Stix2ObjectBase):
132 id: str = Field(frozen=True, default_factory=lambda: f"infrastructure--{uuid4()}")
133 type: Literal["infrastructure"] = "infrastructure"
134 name: str = Field(...)
135 infrastructure_types: List[str] = []
136
137
[docs]
138class Identity(Stix2ObjectBase):
139 id: str = Field(frozen=True, default_factory=lambda: f"identity--{uuid4()}")
140 type: Literal["identity"] = "identity"
141 name: str = Field(...)
142 identity_class: str = ""
143
144
[docs]
145class Malware(Stix2ObjectBase):
146 id: str = Field(frozen=True, default_factory=lambda: f"malware--{uuid4()}")
147 type: Literal["malware"] = "malware"
148 name: str = Field(...)
149 malware_types: List[str] = []
150
151
[docs]
152class ThreatActor(Stix2ObjectBase):
153 id: str = Field(frozen=True, default_factory=lambda: f"threat-actor--{uuid4()}")
154 type: Literal["threat-actor"] = "threat-actor"
155 name: str = Field(...)
156 threat_actor_types: List[str] = []
157
158
[docs]
159class Relationship(Stix2ObjectBase):
160 id: str = Field(frozen=True, default_factory=lambda: f"relationship--{uuid4()}")
161 type: Literal["relationship"] = "relationship"
162 relationship_type: str = ""
163 source_ref: str
164 target_ref: str
165
166
[docs]
167class Stix2Bundle(BaseModel):
168 id: str = Field(frozen=True, default_factory=lambda: f"bundle--{uuid4()}")
169 type: Literal["bundle"] = "bundle"
170 spec_version: Literal["2.1"] = "2.1"
171 objects: List[Stix2ObjectTypes] = []
172
[docs]
173 def by_type(self, object_type: Type["Stix2Object_T"]) -> Generator[Stix2Object_T, None, None]:
174 for obj in self.objects:
175 if obj.type == object_type.model_fields["type"].default:
176 yield obj
177
[docs]
178 def by_id(self, obj_id: str) -> Optional[Stix2Object_T]:
179 for obj in self.objects:
180 if obj.id == obj_id:
181 return obj
182 return None
183
[docs]
184 @staticmethod
185 def load(raw_object: dict) -> "Stix2Bundle":
186 supported_types = Stix2ObjectBase.get_supported_types()
187 objects_to_process = []
188
189 for obj in raw_object["objects"]:
190 if obj["type"] in supported_types:
191 objects_to_process.append(obj)
192
193 raw_object["objects"] = objects_to_process
194 bundle = Stix2Bundle.model_validate(raw_object)
195
196 for relation in bundle.by_type(Relationship):
197 source = bundle.by_id(relation.source_ref)
198 target = bundle.by_id(relation.target_ref)
199 # Remove partially resolved relationships
200 if not source or not target:
201 bundle.objects.remove(relation)
202
203 return bundle