1from typing import Dict, Optional, Union, List, Type, Any
2from uuid import uuid4
3
4from colander_data_converter.base.models import (
5 Actor,
6 Device,
7 Artifact,
8 Observable,
9 Threat,
10 Event,
11 DetectionRule,
12 DataFragment,
13 EntityRelation,
14 ColanderFeed,
15 ColanderRepository,
16 CommonEntitySuperType,
17 CommonEntitySuperTypes,
18 EntityTypes,
19 Entity,
20)
21from colander_data_converter.base.types.actor import *
22from colander_data_converter.base.types.artifact import *
23from colander_data_converter.base.types.device import *
24from colander_data_converter.base.types.observable import *
25from colander_data_converter.base.types.threat import *
26from colander_data_converter.converters.stix2.mapping import Stix2MappingLoader
27from colander_data_converter.converters.stix2.models import (
28 Stix2ObjectBase,
29 Stix2ObjectTypes,
30 Stix2Bundle,
31 Stix2Repository,
32 Relationship,
33)
34from colander_data_converter.converters.stix2.utils import (
35 extract_uuid_from_stix2_id,
36 get_nested_value,
37 set_nested_value,
38 extract_stix2_pattern_name,
39)
40
41
[docs]
42class Stix2Mapper:
43 """
44 Base class for mapping between STIX2 and Colander data using the mapping file.
45 """
46
[docs]
47 def __init__(self):
48 """
49 Initialize the mapper.
50 """
51 self.mapping_loader = Stix2MappingLoader()
52
53
[docs]
54class Stix2ToColanderMapper(Stix2Mapper):
55 """
56 Maps STIX2 data to Colander data using the mapping file.
57 """
58
[docs]
59 def convert(self, stix2_data: Dict[str, Any]) -> ColanderFeed:
60 """
61 Convert STIX2 data to Colander data.
62
63 Args:
64 stix2_data (Dict[str, Any]): The STIX2 data to convert.
65
66 Returns:
67 ColanderFeed: The converted Colander data.
68 """
69 repository = ColanderRepository()
70
71 # Keep track of processed STIX2 object IDs to handle duplicates
72 processed_ids: Dict[str, str] = {}
73
74 # Process STIX2 objects
75 for stix2_object in stix2_data.get("objects", []):
76 stix2_id = stix2_object.get("id", "")
77 stix2_type = stix2_object.get("type", "")
78
79 # Skip if this ID has already been processed with a different type
80 if stix2_id in processed_ids and processed_ids[stix2_id] != stix2_type:
81 # Generate a new UUID for this object to avoid overwriting
82 stix2_object = stix2_object.copy()
83 stix2_object["id"] = f"{stix2_type}--{uuid4()}"
84
85 colander_entity = self.convert_stix2_object(stix2_object)
86 if colander_entity:
87 repository << colander_entity
88 processed_ids[stix2_id] = stix2_type
89
90 # Handle object references
91 for stix2_object in stix2_data.get("objects", []):
92 stix2_id = stix2_object.get("id", "")
93 if stix2_id not in processed_ids:
94 continue
95 stix2_type = stix2_object.get("type", "")
96 if stix2_type == "relationship":
97 continue
98 for attr, value in stix2_object.items():
99 if attr.endswith("_ref"):
100 self._convert_reference(attr, stix2_id, value)
101 elif attr.endswith("_refs"):
102 for ref in stix2_object.get("refs", []):
103 self._convert_reference(attr, stix2_id, ref)
104
105 bundle_id = extract_uuid_from_stix2_id(stix2_data.get("id", ""))
106
107 feed_data = {
108 "id": bundle_id,
109 "name": stix2_data.get("name", "STIX2 Feed"),
110 "description": stix2_data.get("description", "Converted from STIX2"),
111 "entities": repository.entities,
112 "relations": repository.relations,
113 }
114
115 return ColanderFeed.model_validate(feed_data)
116
117 def _convert_reference(self, name: str, source_id: str, target_id: str) -> Optional[EntityRelation]:
118 if not name or not source_id or not target_id:
119 return None
120 relation_name = name.replace("_refs", "").replace("_ref", "").replace("_", " ")
121 source_object_id = extract_uuid_from_stix2_id(source_id)
122 target_object_id = extract_uuid_from_stix2_id(target_id)
123 source = ColanderRepository() >> source_object_id
124 target = ColanderRepository() >> target_object_id
125 if not source or not target:
126 return None
127 relation = EntityRelation(
128 name=relation_name,
129 obj_from=source,
130 obj_to=target,
131 )
132 ColanderRepository() << relation
133 return relation
134
[docs]
135 def convert_stix2_object(
136 self, stix2_object: Dict[str, Any]
137 ) -> Optional[
138 Union[Actor, Device, Artifact, Observable, Threat, Event, DetectionRule, DataFragment, EntityRelation]
139 ]:
140 """
141 Convert a STIX2 object to a Colander entity.
142
143 Args:
144 stix2_object (Dict[str, Any]): The STIX2 object to convert.
145
146 Returns:
147 Optional[Union[Actor, Device, Artifact, Observable, Threat, Event, DetectionRule, DataFragment, EntityRelation]]:
148 The converted Colander entity, or None if the object type is not supported.
149 """
150 stix2_type = stix2_object.get("type", "")
151
152 # Get the Colander entity type for this STIX2 type
153 entity_type, entity_subtype_candidates = self.mapping_loader.get_entity_type_for_stix2(stix2_type)
154
155 if entity_type and entity_subtype_candidates:
156 # Use the appropriate conversion method based on the entity type
157 if entity_type == "actor":
158 return self._convert_to_actor(stix2_object, entity_subtype_candidates)
159 elif entity_type == "device":
160 return self._convert_to_device(stix2_object, entity_subtype_candidates)
161 elif entity_type == "artifact":
162 return self._convert_to_artifact(stix2_object, entity_subtype_candidates)
163 elif entity_type == "observable":
164 return self._convert_to_observable(stix2_object, entity_subtype_candidates)
165 elif entity_type == "threat":
166 return self._convert_to_threat(stix2_object, entity_subtype_candidates)
167
168 # Handle relationship objects
169 if stix2_type == "relationship":
170 return self._convert_to_relation(stix2_object)
171
172 return None
173
174 def _convert_to_entity(
175 self,
176 stix2_object: Dict[str, Any],
177 model_class: Type["EntityTypes"],
178 colander_entity_type,
179 default_name: str = "Unknown Entity",
180 ) -> Any:
181 # Get the field mapping for the entity type
182 colander_entity_super_type: CommonEntitySuperType = CommonEntitySuperTypes.by_short_name(model_class.__name__)
183 field_mapping = self.mapping_loader.get_stix2_to_colander_field_mapping(model_class.__name__)
184
185 if not colander_entity_type:
186 raise ValueError("Invalid entity type")
187
188 # Create the base entity data
189 stix2_id = stix2_object.get("id", "")
190 extracted_uuid = extract_uuid_from_stix2_id(stix2_id)
191 entity_data = {
192 "id": extracted_uuid,
193 "name": stix2_object.get("name", default_name),
194 "description": stix2_object.get("description", ""),
195 "super_type": colander_entity_super_type,
196 "type": colander_entity_type,
197 "attributes": {},
198 }
199
200 # Apply the field mapping
201 for stix2_field, colander_field in field_mapping.items():
202 value = get_nested_value(stix2_object, stix2_field)
203 if value is not None:
204 if "." in colander_field:
205 # Handle nested fields
206 set_nested_value(entity_data, colander_field, value)
207 else:
208 entity_data[colander_field] = value
209
210 # Add any additional attributes from the STIX2 object
211 _ignore = ["id", "type"]
212 for key, value in stix2_object.items():
213 if key not in field_mapping and key not in _ignore and isinstance(value, (str, int, float, bool)):
214 entity_data["attributes"][key] = str(value)
215
216 try:
217 return model_class.model_validate(entity_data)
218 except Exception as e:
219 raise e
220
221 def _get_actor_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
222 default_type = ArtifactTypes.default.value.short_name.lower()
223 if not subtype_candidates:
224 return default_type
225
226 if stix2_object.get("type", "") == "threat-actor":
227 default_type = "threat_actor"
228 for subtype_candidate in subtype_candidates:
229 if subtype_candidate.lower() in stix2_object.get("threat_actor_types", []):
230 return subtype_candidate
231 return default_type
232
233 if len(subtype_candidates) == 1:
234 return subtype_candidates[0]
235
236 return default_type
237
238 def _convert_to_actor(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Actor:
239 _stix2_object = stix2_object.copy()
240 if "threat_actor_types" in _stix2_object and _stix2_object["threat_actor_types"] is not None:
241 _stix2_object["threat_actor_types"] = ",".join(_stix2_object["threat_actor_types"])
242
243 _actor_type = self._get_actor_type(_stix2_object, subtype_candidates)
244
245 return self._convert_to_entity(
246 stix2_object=_stix2_object,
247 model_class=Actor,
248 colander_entity_type=ActorTypes.by_short_name(_actor_type),
249 )
250
251 def _get_device_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
252 default_type = DeviceTypes.default.value.short_name.lower()
253 if not subtype_candidates:
254 return default_type
255
256 if len(subtype_candidates) == 1:
257 return subtype_candidates[0]
258
259 for subtype_candidate in subtype_candidates:
260 if subtype_candidate.lower() in stix2_object.get("infrastructure_types", []):
261 return subtype_candidate
262
263 return default_type
264
265 def _convert_to_device(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Device:
266 _stix2_object = stix2_object.copy()
267 if "infrastructure_types" in _stix2_object and _stix2_object["infrastructure_types"] is not None:
268 _stix2_object["infrastructure_types"] = ",".join(_stix2_object["infrastructure_types"])
269
270 _device_type = self._get_device_type(_stix2_object, subtype_candidates)
271 return self._convert_to_entity(
272 stix2_object=_stix2_object,
273 model_class=Device,
274 colander_entity_type=DeviceTypes.by_short_name(_device_type),
275 )
276
277 def _get_artifact_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
278 default_type = ArtifactTypes.default.value.short_name.lower()
279 if not subtype_candidates:
280 return default_type
281
282 artifact_type = ArtifactTypes.by_mime_type(stix2_object.get("mime_type", "unspecified")).short_name
283
284 return artifact_type or default_type
285
286 def _convert_to_artifact(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Artifact:
287 _artifact_type = self._get_artifact_type(stix2_object, subtype_candidates)
288
289 return self._convert_to_entity(
290 stix2_object=stix2_object,
291 model_class=Artifact,
292 colander_entity_type=ArtifactTypes.by_short_name(_artifact_type),
293 )
294
295 def _get_observable_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
296 default_type = ObservableTypes.default.value.short_name.lower()
297 if not subtype_candidates:
298 return default_type
299
300 _pattern_name = extract_stix2_pattern_name(stix2_object.get("pattern", "")) or "unspecified"
301 for _candidate in subtype_candidates:
302 _mapping = self.mapping_loader.get_entity_subtype_mapping("observable", _candidate)
303 if _pattern_name in _mapping["pattern"]:
304 return _candidate
305
306 # Return the generic subtype as it was not possible to narrow down the type selection
307 return default_type
308
309 def _convert_to_observable(
310 self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]
311 ) -> Observable:
312 _observable_type = self._get_observable_type(stix2_object, subtype_candidates)
313 # Use the generic conversion method
314 return self._convert_to_entity(
315 stix2_object=stix2_object,
316 model_class=Observable,
317 colander_entity_type=ObservableTypes.by_short_name(_observable_type),
318 )
319
320 def _get_threat_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
321 default_type = ThreatTypes.default.value.short_name.lower()
322 if not subtype_candidates:
323 return default_type
324
325 for _candidate in subtype_candidates:
326 if _candidate in stix2_object.get("malware_types", []):
327 return _candidate
328
329 # Return the generic subtype as it was not possible to narrow down the type selection
330 return default_type
331
332 def _convert_to_threat(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Threat:
333 _threat_type = self._get_threat_type(stix2_object, subtype_candidates)
334 # Use the generic conversion method
335 return self._convert_to_entity(
336 stix2_object=stix2_object,
337 model_class=Threat,
338 colander_entity_type=ThreatTypes.by_short_name(_threat_type),
339 )
340
341 def _convert_to_relation(self, stix2_object: Dict[str, Any]) -> Optional[EntityRelation]:
342 relationship_type = stix2_object.get("relationship_type", "")
343 source_ref = stix2_object.get("source_ref", "")
344 target_ref = stix2_object.get("target_ref", "")
345
346 if not relationship_type or not source_ref or not target_ref:
347 return None
348
349 # Extract UUIDs from the references
350 source_id = extract_uuid_from_stix2_id(source_ref)
351 target_id = extract_uuid_from_stix2_id(target_ref)
352
353 if not source_id or not target_id:
354 return None
355
356 # Create the relation data
357 relation_data = {
358 "id": extract_uuid_from_stix2_id(stix2_object.get("id", "")),
359 "name": stix2_object.get("name", relationship_type),
360 "description": stix2_object.get("description", ""),
361 "created_at": stix2_object.get("created"),
362 "updated_at": stix2_object.get("modified"),
363 "obj_from": source_id,
364 "obj_to": target_id,
365 "attributes": {},
366 }
367
368 # Add any additional attributes from the STIX2 object
369 for key, value in stix2_object.items():
370 if key not in [
371 "id",
372 "type",
373 "name",
374 "description",
375 "created",
376 "modified",
377 "source_ref",
378 "target_ref",
379 ] and isinstance(value, (str, int, float, bool)):
380 relation_data["attributes"][key] = str(value)
381
382 return EntityRelation.model_validate(relation_data)
383
384
[docs]
385class ColanderToStix2Mapper(Stix2Mapper):
386 """
387 Maps Colander data to STIX2 data using the mapping file.
388 """
389
[docs]
390 def convert(self, colander_feed: ColanderFeed) -> Stix2Bundle:
391 stix2_data = {
392 "type": "bundle",
393 "id": f"bundle--{colander_feed.id or uuid4()}",
394 "spec_version": "2.1",
395 "objects": [],
396 }
397
398 # Convert entities
399 for _, entity in colander_feed.entities.items():
400 if not issubclass(entity.__class__, Entity):
401 continue
402 if entity.super_type.short_name.lower() not in self.mapping_loader.get_supported_colander_types():
403 continue
404 stix2_object = self.convert_colander_entity(entity)
405 if stix2_object:
406 stix2_data["objects"].append(stix2_object)
407
408 bundle = Stix2Bundle(**stix2_data)
409
410 # Extract and convert immutable relations
411 for _, entity in colander_feed.entities.items():
412 if not issubclass(entity.__class__, Entity):
413 continue
414 if entity.super_type.short_name.lower() not in self.mapping_loader.get_supported_colander_types():
415 continue
416 for _, relation in entity.get_immutable_relations(
417 mapping=self.mapping_loader.get_field_relationship_mapping(), default_name="related-to"
418 ).items():
419 stix2_object = self.convert_colander_relation(relation)
420 if stix2_object:
421 bundle.objects.append(Relationship(**stix2_object))
422
423 # Convert relations
424 for relation_id, relation in colander_feed.relations.items():
425 if isinstance(relation, EntityRelation):
426 stix2_object = self.convert_colander_relation(relation)
427 if stix2_object:
428 bundle.objects.append(Relationship(**stix2_object))
429
430 return bundle
431
[docs]
432 def convert_colander_entity(
433 self, entity: Union[Actor, Device, Artifact, Observable, Threat]
434 ) -> Optional[Dict[str, Any]]:
435 """
436 Convert a Colander entity to a STIX2 object.
437
438 Args:
439 entity: The Colander entity to convert.
440
441 Returns:
442 Optional[Dict[str, Any]]: The converted STIX2 object, or None if the entity type is not supported.
443 """
444 if isinstance(entity, Actor):
445 return self._convert_from_actor(entity)
446 elif isinstance(entity, Device):
447 return self._convert_from_device(entity)
448 elif isinstance(entity, Artifact):
449 return self._convert_from_artifact(entity)
450 elif isinstance(entity, Observable):
451 return self._convert_from_observable(entity)
452 elif isinstance(entity, Threat):
453 return self._convert_from_threat(entity)
454
455 return None
456
[docs]
457 def convert_colander_relation(self, relation: EntityRelation) -> Optional[Dict[str, Any]]:
458 """
459 Convert a Colander EntityRelation to a STIX2 relationship object.
460
461 Args:
462 relation (~colander_data_converter.base.models.EntityRelation): The Colander EntityRelation to convert.
463
464 Returns:
465 Optional[Dict[str, Any]]: The converted STIX2 relationship object, or None if the relation cannot be
466 converted.
467 """
468 return self._convert_from_relation(relation)
469
470 def _convert_from_entity(
471 self, entity: Any, additional_fields: Optional[Dict[str, Any]] = None
472 ) -> Optional[Stix2ObjectTypes]:
473 # Get the STIX2 type for the entity
474 stix2_type = self.mapping_loader.get_stix2_type_for_entity(entity)
475 if not stix2_type or (model_class := Stix2ObjectBase.get_model_class(stix2_type)) is None:
476 return None
477
478 # Get the field mapping for the entity type
479 entity_type = entity.get_super_type().short_name
480 field_mapping = self.mapping_loader.get_colander_to_stix2_field_mapping(entity_type)
481
482 # Create the base STIX2 object
483 stix2_object = {
484 "type": stix2_type,
485 "id": f"{stix2_type}--{entity.id}",
486 "created": entity.created_at.isoformat(),
487 "modified": entity.updated_at.isoformat(),
488 }
489
490 if "name" in model_class.model_fields:
491 stix2_object["name"] = entity.name
492
493 # Add any additional fields
494 if additional_fields:
495 stix2_object.update(additional_fields)
496
497 # Apply the field mapping
498 for colander_field, stix2_field in field_mapping.items():
499 value = get_nested_value(entity.model_dump(), colander_field)
500 if value is not None:
501 if "." in stix2_field:
502 # Handle nested fields
503 set_nested_value(stix2_object, stix2_field, value)
504 else:
505 stix2_object[stix2_field] = value
506
507 # Add any additional attributes
508 if hasattr(entity, "attributes") and entity.attributes:
509 for key, value in entity.attributes.items():
510 if key not in [field.split(".")[-1] for field in field_mapping.keys() if "." in field]:
511 stix2_object[key] = value
512
513 return model_class(**stix2_object)
514
515 def _convert_from_actor(self, actor: Actor) -> Optional[Dict[str, Any]]:
516 mapping = self.mapping_loader.get_actor_mapping(actor.type.short_name)
517 if not mapping:
518 return None
519 extra_attributes = self.mapping_loader.get_entity_extra_values("actor", actor.type.short_name)
520
521 return self._convert_from_entity(actor, extra_attributes)
522
523 def _convert_from_device(self, device: Device) -> Optional[Dict[str, Any]]:
524 mapping = self.mapping_loader.get_device_mapping(device.type.short_name)
525 if not mapping:
526 return None
527 extra_attributes = self.mapping_loader.get_entity_extra_values("device", device.type.short_name)
528 return self._convert_from_entity(device, extra_attributes)
529
530 def _convert_from_artifact(self, artifact: Artifact) -> Dict[str, Any]:
531 return self._convert_from_entity(artifact)
532
533 def _generate_observable_pattern(self, observable: Observable) -> Dict[str, Any]:
534 pattern_fields = {}
535
536 observable_type_short_name = observable.type.short_name.lower()
537 pattern_template = self.mapping_loader.get_observable_pattern(observable_type_short_name)
538 pattern_fields["pattern_type"] = "stix"
539 if pattern_template:
540 pattern_fields["pattern"] = pattern_template.format(value=observable.name)
541 # If the observable type is not found in the mapping, use a generic pattern
542 else:
543 pattern_fields["pattern"] = f"[unknown:value = '{observable.name}']"
544
545 return pattern_fields
546
547 def _convert_from_observable(self, observable: Observable) -> Dict[str, Any]:
548 # Generate pattern fields for the observable
549 pattern_fields = self._generate_observable_pattern(observable)
550 additional_fields = {}
551 additional_fields.update(pattern_fields)
552
553 # Add indicator_types to the additional fields
554 if observable.associated_threat is not None:
555 additional_fields.update({"indicator_types": ["malicious-activity"]})
556
557 return self._convert_from_entity(observable, additional_fields)
558
559 def _get_threat_malware_types(self, threat: Threat) -> Dict[str, Any]:
560 additional_fields = {}
561
562 # Get the STIX2 type for threats
563 stix2_type = self.mapping_loader.get_stix2_type_for_entity(threat)
564
565 # Add malware_types if the type is malware
566 if stix2_type == "malware":
567 threat_type_short_name = threat.type.short_name.lower()
568 malware_types = self.mapping_loader.get_malware_types_for_threat(threat_type_short_name)
569 if malware_types:
570 additional_fields["malware_types"] = malware_types
571 else:
572 additional_fields["malware_types"] = ["unknown", threat_type_short_name]
573
574 return additional_fields
575
576 def _convert_from_threat(self, threat: Threat) -> Dict[str, Any]:
577 additional_fields = self._get_threat_malware_types(threat)
578 return self._convert_from_entity(threat, additional_fields)
579
580 def _convert_from_relation(self, relation: EntityRelation) -> Optional[Dict[str, Any]]:
581 if not relation.obj_from or not relation.obj_to:
582 return None
583
584 if not relation.is_fully_resolved():
585 return None
586
587 supported_types = self.mapping_loader.get_supported_colander_types()
588 if (
589 relation.obj_from.super_type.short_name.lower() not in supported_types
590 or relation.obj_to.super_type.short_name.lower() not in supported_types
591 ):
592 return None
593
594 source_prefix = self.mapping_loader.get_stix2_type_for_entity(relation.obj_from) or "unknown"
595 target_prefix = self.mapping_loader.get_stix2_type_for_entity(relation.obj_to) or "unknown"
596 source_ref = f"{source_prefix}--{relation.obj_from.id}"
597 target_ref = f"{target_prefix}--{relation.obj_to.id}"
598 repository = Stix2Repository()
599 source = repository >> source_ref
600 target = repository >> target_ref
601
602 if not source or not target:
603 return None
604
605 # Create the base STIX2 relationship object
606 stix2_object = {
607 "type": "relationship",
608 "id": f"relationship--{relation.id}",
609 "relationship_type": relation.name.replace(" ", "-"),
610 "created": relation.created_at.isoformat(),
611 "modified": relation.updated_at.isoformat(),
612 "source_ref": f"{source_prefix}--{relation.obj_from.id}",
613 "target_ref": f"{target_prefix}--{relation.obj_to.id}",
614 }
615
616 # Add any additional attributes
617 if hasattr(relation, "attributes") and relation.attributes:
618 for key, value in relation.attributes.items():
619 stix2_object[key] = value
620
621 return stix2_object
622
623
[docs]
624class Stix2Converter:
625 """
626 Converter for STIX2 data to Colander data and vice versa.
627 Uses the mapping file to convert between formats.
628 """
629
[docs]
630 @staticmethod
631 def stix2_to_colander(stix2_data: Dict[str, Any]) -> ColanderFeed:
632 """
633 Converts STIX2 data to Colander data using the mapping file.
634
635 Args:
636 stix2_data (Dict[str, Any]): The STIX2 data to convert.
637
638 Returns:
639 ColanderFeed: The converted Colander data.
640 """
641 mapper = Stix2ToColanderMapper()
642 return mapper.convert(stix2_data)
643
[docs]
644 @staticmethod
645 def colander_to_stix2(colander_feed: ColanderFeed) -> Stix2Bundle:
646 """
647 Converts Colander data to STIX2 data using the mapping file.
648
649 Args:
650 colander_feed (ColanderFeed): The Colander data to convert.
651
652 Returns:
653 Stix2Bundle: The converted STIX2 bundle.
654 """
655 mapper = ColanderToStix2Mapper()
656 colander_feed.resolve_references()
657 return mapper.convert(colander_feed)