from datetime import datetime, UTC
from typing import Dict, Any, Optional, Union, List
from uuid import uuid4
from colander_data_converter.base.models import (
Actor,
Device,
Artifact,
Observable,
Threat,
Event,
DetectionRule,
DataFragment,
EntityRelation,
ColanderFeed,
ColanderRepository,
DeviceTypes,
ArtifactTypes,
ObservableTypes,
ThreatTypes,
CommonEntitySuperType,
CommonEntitySuperTypes,
ActorTypes,
)
from colander_data_converter.formats.stix2.mapping import Stix2MappingLoader
from colander_data_converter.formats.stix2.utils import (
extract_uuid_from_stix2_id,
get_nested_value,
set_nested_value,
extract_stix2_pattern_name,
)
[docs]
class Stix2Mapper:
"""
Base class for mapping between STIX2 and Colander data using the mapping file.
"""
def __init__(self):
"""
Initialize the mapper.
"""
self.mapping_loader = Stix2MappingLoader()
[docs]
class Stix2ToColanderMapper(Stix2Mapper):
"""
Maps STIX2 data to Colander data using the mapping file.
"""
[docs]
def convert(self, stix2_data: Dict[str, Any]) -> ColanderFeed:
"""
Convert STIX2 data to Colander data.
Args:
stix2_data (Dict[str, Any]): The STIX2 data to convert.
Returns:
ColanderFeed: The converted Colander data.
"""
repository = ColanderRepository()
# Keep track of processed STIX2 object IDs to handle duplicates
processed_ids: Dict[str, str] = {}
# Process STIX2 objects
for stix2_object in stix2_data.get("objects", []):
stix2_id = stix2_object.get("id", "")
stix2_type = stix2_object.get("type", "")
# Skip if this ID has already been processed with a different type
if stix2_id in processed_ids and processed_ids[stix2_id] != stix2_type:
# Generate a new UUID for this object to avoid overwriting
stix2_object = stix2_object.copy()
stix2_object["id"] = f"{stix2_type}--{uuid4()}"
colander_entity = self.convert_stix2_object(stix2_object)
if colander_entity:
repository << colander_entity
processed_ids[stix2_id] = stix2_type
bundle_id = extract_uuid_from_stix2_id(stix2_data.get("id", ""))
feed_data = {
"id": bundle_id,
"name": stix2_data.get("name", "STIX2 Feed"),
"description": stix2_data.get("description", "Converted from STIX2"),
"entities": repository.entities,
"relations": repository.relations,
}
return ColanderFeed.model_validate(feed_data)
[docs]
def convert_stix2_object(
self, stix2_object: Dict[str, Any]
) -> Optional[
Union[Actor, Device, Artifact, Observable, Threat, Event, DetectionRule, DataFragment, EntityRelation]
]:
"""
Convert a STIX2 object to a Colander entity.
Args:
stix2_object (Dict[str, Any]): The STIX2 object to convert.
Returns:
Optional[Union[Actor, Device, Artifact, Observable, Threat, Event, DetectionRule, DataFragment, EntityRelation]]:
The converted Colander entity, or None if the object type is not supported.
"""
stix2_type = stix2_object.get("type", "")
# Get the Colander entity type for this STIX2 type
entity_type, entity_subtype_candidates = self.mapping_loader.get_entity_type_for_stix2(stix2_type)
if entity_type and entity_subtype_candidates:
# Use the appropriate conversion method based on the entity type
if entity_type == "actor":
return self._convert_to_actor(stix2_object, entity_subtype_candidates)
elif entity_type == "device":
return self._convert_to_device(stix2_object, entity_subtype_candidates)
elif entity_type == "artifact":
return self._convert_to_artifact(stix2_object, entity_subtype_candidates)
elif entity_type == "observable":
return self._convert_to_observable(stix2_object, entity_subtype_candidates)
elif entity_type == "threat":
return self._convert_to_threat(stix2_object, entity_subtype_candidates)
# Handle relationship objects
if stix2_type == "relationship":
return self._convert_to_relation(stix2_object)
return None
def _convert_to_entity(
self,
stix2_object: Dict[str, Any],
model_class: type,
colander_entity_type,
default_name: str = "Unknown Entity",
) -> Any:
"""
Converts a STIX2-compliant dictionary object to a specific entity model representation
using the provided mapping and model class. This function extracts relevant fields,
maps them to the target entity structure, and validates the final structure using the
`model_class`.
Parameters:
stix2_object (Dict[str, Any]): The input dictionary adhering to the STIX2 format.
It contains raw data that will be converted into the specific entity model.
model_class (type): The target model class to which the STIX2 object will be
converted. The model class must support a `model_validate` method for validation.
colander_entity_type: The specific entity type that the converted object should
adhere to. This is used for determining the final type of the entity.
default_name (str): A default name to assign to the entity if the "name" field
is not present in the provided STIX2 object. Default is "Unknown Entity".
Returns:
Any: The validated and converted entity object as specified by the `model_class`.
Raises:
ValueError: If the `colander_entity_type` parameter is invalid.
Exception: If the `model_class.model_validate` method raises an error during
validation of the final converted entity structure.
"""
# Get the field mapping for the entity type
colander_entity_super_type: CommonEntitySuperType = CommonEntitySuperTypes.by_short_name(model_class.__name__)
field_mapping = self.mapping_loader.get_stix2_to_colander_field_mapping(model_class.__name__)
if not colander_entity_type:
raise ValueError("Invalid entity type")
# Create the base entity data
stix2_id = stix2_object.get("id", "")
extracted_uuid = extract_uuid_from_stix2_id(stix2_id)
entity_data = {
"id": extracted_uuid,
"name": stix2_object.get("name", default_name),
"description": stix2_object.get("description", ""),
"super_type": colander_entity_super_type,
"type": colander_entity_type,
"attributes": {},
}
# Apply the field mapping
for stix2_field, colander_field in field_mapping.items():
value = get_nested_value(stix2_object, stix2_field)
if value is not None:
if "." in colander_field:
# Handle nested fields
set_nested_value(entity_data, colander_field, value)
else:
entity_data[colander_field] = value
# Add any additional attributes from the STIX2 object
_ignore = ["id", "type"]
for key, value in stix2_object.items():
if key not in field_mapping and key not in _ignore and isinstance(value, (str, int, float, bool)):
entity_data["attributes"][key] = str(value)
try:
return model_class.model_validate(entity_data)
except Exception as e:
raise e
def _get_actor_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
"""
Determines the actor type based on a given STIX 2.0 object and a list of subtype candidates.
This method analyzes the type of a STIX 2.0 object and checks for matches within
its "threat_actor_types" field against a provided list of subtype candidates. If a subtype
candidate matches, it returns the matching candidate. If no match is found, the method
assigns a default type depending on the object's type and other conditions.
Parameters:
stix2_object (Dict[str, Any]): The STIX 2.0 object to evaluate.
subtype_candidates (Optional[List[str]]): A list containing possible subtype options
for the STIX 2.0 object.
Returns:
str: The determined actor type based on the input object and subtype candidates.
"""
default_type = ArtifactTypes.default.short_name.lower()
if not subtype_candidates:
return default_type
if stix2_object.get("type", "") == "threat-actor":
default_type = "threat_actor"
for subtype_candidate in subtype_candidates:
if subtype_candidate.lower() in stix2_object.get("threat_actor_types", []):
return subtype_candidate
return default_type
if len(subtype_candidates) == 1:
return subtype_candidates[0]
return default_type
def _convert_to_actor(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Actor:
"""
Convert a STIX2 object to a Colander Actor entity.
Args:
stix2_object (Dict[str, Any]): The STIX2 object to convert.
subtype_candidates (Optional[List[str]]): A list providing
potential subtype candidates to aid in subtype resolution during the
conversion process.
Returns:
Actor: The converted Colander Actor entity.
"""
_stix2_object = stix2_object.copy()
if "threat_actor_types" in _stix2_object and _stix2_object["threat_actor_types"] is not None:
_stix2_object["threat_actor_types"] = ",".join(_stix2_object["threat_actor_types"])
_actor_type = self._get_actor_type(_stix2_object, subtype_candidates)
return self._convert_to_entity(
stix2_object=_stix2_object,
model_class=Actor,
colander_entity_type=ActorTypes.by_short_name(_actor_type),
)
def _get_device_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
"""
Determines the device type from the provided STIX object and subtype candidates.
This method analyzes a given STIX 2.0 object and a list of subtype candidates to determine
the most appropriate device type. If there is exactly one subtype candidate, it is returned directly.
Otherwise, the method checks if each candidate matches an infrastructure type in the STIX object.
If no matching subtype is found, a default type of "generic" is returned.
The method assumes that the `infrastructure_types` field in the STIX object contains the relevant
information for matching.
Args:
stix2_object (Dict[str, Any]): A dictionary representing a STIX 2.0 object.
subtype_candidates (Optional[List[str]]): A list of strings representing potential device subtypes.
Returns:
str: The determined device type, or "generic" if no suitable subtype is found.
"""
default_type = DeviceTypes.default.short_name.lower()
if not subtype_candidates:
return default_type
if len(subtype_candidates) == 1:
return subtype_candidates[0]
for subtype_candidate in subtype_candidates:
if subtype_candidate.lower() in stix2_object.get("infrastructure_types", []):
return subtype_candidate
return default_type
def _convert_to_device(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Device:
"""
Convert a STIX2 object to a Colander Device entity.
Args:
stix2_object (Dict[str, Any]): The STIX2 object to convert.
subtype_candidates (Optional[List[str]]): A list providing
potential subtype candidates to aid in subtype resolution during the
conversion process.
Returns:
Device: The converted Colander Device entity.
"""
_stix2_object = stix2_object.copy()
if "infrastructure_types" in _stix2_object and _stix2_object["infrastructure_types"] is not None:
_stix2_object["infrastructure_types"] = ",".join(_stix2_object["infrastructure_types"])
_device_type = self._get_device_type(_stix2_object, subtype_candidates)
return self._convert_to_entity(
stix2_object=_stix2_object,
model_class=Device,
colander_entity_type=DeviceTypes.by_short_name(_device_type),
)
def _get_artifact_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
"""
Determines the artifact type for a given STIX 2 object based on its MIME type.
This method evaluates the MIME type of a provided STIX 2 object and attempts to
map it to a corresponding artifact type using a predefined mapping. If no
specific artifact type mapping is found, a default type is returned.
Args:
stix2_object (Dict[str, Any]): A dictionary representing a STIX 2 object.
It is expected to contain a "mime_type" key indicating the MIME type
of the object. If "mime_type" is not present, "unspecified" is used
as a default value.
subtype_candidates (Optional[List[str]]): A list of possible subtype candidates
for classification. This parameter is not currently utilized by the method.
Returns:
str: The resolved artifact type based on the MIME type of the STIX 2 object,
or a default artifact type ("generic") if no specific mapping is available.
"""
default_type = ArtifactTypes.default.short_name.lower()
if not subtype_candidates:
return default_type
artifact_type = ArtifactTypes.by_mime_type(stix2_object.get("mime_type", "unspecified")).short_name
return artifact_type or default_type
def _convert_to_artifact(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Artifact:
"""
Converts a given STIX 2.0 object into an Artifact entity.
This function transforms a STIX 2.0 object into an internal Artifact entity
representation. It uses the provided subtype candidates to assist with resolving
the proper subtype for the Artifact entity when converting it.
Args:
stix2_object (Dict[str, Any]): The input STIX 2.0 object containing the data
to transform into an Artifact.
subtype_candidates (Optional[List[str]]): A list providing
potential subtype candidates to aid in subtype resolution during the
conversion process.
Returns:
Artifact: The resulting Artifact entity converted from the STIX 2.0 object.
"""
_artifact_type = self._get_artifact_type(stix2_object, subtype_candidates)
return self._convert_to_entity(
stix2_object=stix2_object,
model_class=Artifact,
colander_entity_type=ArtifactTypes.by_short_name(_artifact_type),
)
def _get_observable_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
default_type = ObservableTypes.default.short_name.lower()
if not subtype_candidates:
return default_type
_pattern_name = extract_stix2_pattern_name(stix2_object.get("pattern", "")) or "unspecified"
for _candidate in subtype_candidates:
_mapping = self.mapping_loader.get_entity_subtype_mapping("observable", _candidate)
if _pattern_name in _mapping["pattern"]:
return _candidate
# Return the generic subtype as it was not possible to narrow down the type selection
return default_type
def _convert_to_observable(
self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]
) -> Observable:
"""
Convert a STIX2 object to a Colander Observable entity.
Args:
stix2_object (Dict[str, Any]): The STIX2 object to convert.
subtype_candidates (Optional[List[str]]): A list providing
potential subtype candidates to aid in subtype resolution during the
conversion process.
Returns:
Observable: The converted Colander Observable entity.
"""
_observable_type = self._get_observable_type(stix2_object, subtype_candidates)
# Use the generic conversion method
return self._convert_to_entity(
stix2_object=stix2_object,
model_class=Observable,
colander_entity_type=ObservableTypes.by_short_name(_observable_type),
)
def _get_threat_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str:
default_type = ThreatTypes.default.short_name.lower()
if not subtype_candidates:
return default_type
for _candidate in subtype_candidates:
if _candidate in stix2_object.get("malware_types", []):
return _candidate
# Return the generic subtype as it was not possible to narrow down the type selection
return default_type
def _convert_to_threat(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Threat:
"""
Convert a STIX2 object to a Colander Threat entity.
Args:
stix2_object (Dict[str, Any]): The STIX2 object to convert.
subtype_candidates (Optional[List[str]]): A list providing
potential subtype candidates to aid in subtype resolution during the
conversion process.
Returns:
Threat: The converted Colander Threat entity.
"""
_threat_type = self._get_threat_type(stix2_object, subtype_candidates)
# Use the generic conversion method
return self._convert_to_entity(
stix2_object=stix2_object,
model_class=Threat,
colander_entity_type=ThreatTypes.by_short_name(_threat_type),
)
def _convert_to_relation(self, stix2_object: Dict[str, Any]) -> Optional[EntityRelation]:
"""
Convert a STIX2 relationship object to a Colander EntityRelation.
Args:
stix2_object (Dict[str, Any]): The STIX2 relationship object to convert.
Returns:
Optional[EntityRelation]: The converted Colander EntityRelation, or None if the relationship is not valid.
"""
relationship_type = stix2_object.get("relationship_type", "")
source_ref = stix2_object.get("source_ref", "")
target_ref = stix2_object.get("target_ref", "")
if not relationship_type or not source_ref or not target_ref:
return None
# Extract UUIDs from the references
source_id = extract_uuid_from_stix2_id(source_ref)
target_id = extract_uuid_from_stix2_id(target_ref)
if not source_id or not target_id:
return None
# Create the relation data
relation_data = {
"id": extract_uuid_from_stix2_id(stix2_object.get("id", "")),
"name": stix2_object.get("name", relationship_type),
"description": stix2_object.get("description", ""),
"created_at": stix2_object.get("created"),
"updated_at": stix2_object.get("modified"),
"obj_from": source_id,
"obj_to": target_id,
"attributes": {},
}
# Add any additional attributes from the STIX2 object
for key, value in stix2_object.items():
if key not in [
"id",
"type",
"name",
"description",
"created",
"modified",
"source_ref",
"target_ref",
] and isinstance(value, (str, int, float, bool)):
relation_data["attributes"][key] = str(value)
return EntityRelation.model_validate(relation_data)
[docs]
class ColanderToStix2Mapper(Stix2Mapper):
"""
Maps Colander data to STIX2 data using the mapping file.
"""
[docs]
def convert(self, colander_feed: ColanderFeed) -> Dict[str, Any]:
"""
Convert Colander data to STIX2 data.
Args:
colander_feed (ColanderFeed): The Colander data to convert.
Returns:
Dict[str, Any]: The converted STIX2 data.
"""
stix2_data = {"type": "bundle", "id": f"bundle--{uuid4()}", "spec_version": "2.1", "objects": []}
# Convert entities
if hasattr(colander_feed, "entities"):
for entity_id, entity in colander_feed.entities.items():
stix2_object = self.convert_colander_entity(entity)
if stix2_object:
stix2_data["objects"].append(stix2_object)
# Extract and convert ObjectReference relationships
ref_relationships = self._extract_object_reference_relationships(entity)
for rel in ref_relationships:
stix2_data["objects"].append(rel)
# Convert relations
if hasattr(colander_feed, "relations"):
for relation_id, relation in colander_feed.relations.items():
if isinstance(relation, EntityRelation):
stix2_object = self.convert_colander_relation(relation)
if stix2_object:
stix2_data["objects"].append(stix2_object)
return stix2_data
[docs]
def convert_colander_entity(
self, entity: Union[Actor, Device, Artifact, Observable, Threat, DetectionRule, DataFragment]
) -> Optional[Dict[str, Any]]:
"""
Convert a Colander entity to a STIX2 object.
Args:
entity: The Colander entity to convert.
Returns:
Optional[Dict[str, Any]]: The converted STIX2 object, or None if the entity type is not supported.
"""
if isinstance(entity, Actor):
return self._convert_from_actor(entity)
elif isinstance(entity, Device):
return self._convert_from_device(entity)
elif isinstance(entity, Artifact):
return self._convert_from_artifact(entity)
elif isinstance(entity, Observable):
return self._convert_from_observable(entity)
elif isinstance(entity, Threat):
return self._convert_from_threat(entity)
return None
[docs]
def convert_colander_relation(self, relation: EntityRelation) -> Optional[Dict[str, Any]]:
"""
Convert a Colander EntityRelation to a STIX2 relationship object.
Args:
relation (EntityRelation): The Colander EntityRelation to convert.
Returns:
Optional[Dict[str, Any]]: The converted STIX2 relationship object, or None if the relation cannot be converted.
"""
return self._convert_from_relation(relation)
def _convert_from_entity(
self, entity: Any, entity_type: str, additional_fields: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generic method to convert a Colander entity to a STIX2 object.
Args:
entity (Any): The Colander entity to convert.
entity_type (str): The type of entity being converted (e.g., "actor", "device").
additional_fields (Optional[Dict[str, Any]], optional): Additional fields to add to the STIX2 object.
Returns:
Dict[str, Any]: The converted STIX2 object.
"""
# Get the STIX2 type for the entity
stix2_type = self.mapping_loader.get_stix2_type_for_entity(entity_type)
# Get the field mapping for the entity type
field_mapping = self.mapping_loader.get_colander_to_stix2_field_mapping(entity_type)
# Create the base STIX2 object
stix2_object = {
"type": stix2_type,
"id": f"{stix2_type}--{entity.id}",
"created": entity.created_at.isoformat()
if hasattr(entity, "created_at") and entity.created_at
else datetime.now(UTC).isoformat(),
"modified": entity.updated_at.isoformat()
if hasattr(entity, "updated_at") and entity.updated_at
else datetime.now(UTC).isoformat(),
}
# Add any additional fields
if additional_fields:
stix2_object.update(additional_fields)
# Apply the field mapping
for colander_field, stix2_field in field_mapping.items():
value = get_nested_value(entity.model_dump(), colander_field)
if value is not None:
if "." in stix2_field:
# Handle nested fields
set_nested_value(stix2_object, stix2_field, value)
else:
stix2_object[stix2_field] = value
# Add any additional attributes
if hasattr(entity, "attributes") and entity.attributes:
for key, value in entity.attributes.items():
if key not in [field.split(".")[-1] for field in field_mapping.keys() if "." in field]:
stix2_object[key] = value
return stix2_object
def _convert_from_entity_by_type(
self, entity: Any, entity_type: str, additional_fields: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generic method to convert a specific Colander entity type to a STIX2 object.
Args:
entity (Any): The Colander entity to convert.
entity_type (str): The type of entity being converted (e.g., "actor", "device").
additional_fields (Optional[Dict[str, Any]], optional): Additional fields to add to the STIX2 object.
Returns:
Dict[str, Any]: The converted STIX2 object.
"""
return self._convert_from_entity(entity, entity_type, additional_fields)
def _convert_from_actor(self, actor: Actor) -> Dict[str, Any]:
"""
Convert a Colander Actor entity to a STIX2 object.
Args:
actor (Actor): The Colander Actor entity to convert.
Returns:
Dict[str, Any]: The converted STIX2 object.
"""
return self._convert_from_entity_by_type(actor, "actor")
def _convert_from_device(self, device: Device) -> Dict[str, Any]:
"""
Convert a Colander Device entity to a STIX2 object.
Args:
device (Device): The Colander Device entity to convert.
Returns:
Dict[str, Any]: The converted STIX2 object.
"""
return self._convert_from_entity_by_type(device, "device")
def _convert_from_artifact(self, artifact: Artifact) -> Dict[str, Any]:
"""
Convert a Colander Artifact entity to a STIX2 object.
Args:
artifact (Artifact): The Colander Artifact entity to convert.
Returns:
Dict[str, Any]: The converted STIX2 object.
"""
return self._convert_from_entity_by_type(artifact, "artifact")
def _generate_observable_pattern(self, observable: Observable) -> Dict[str, Any]:
"""
Generate a pattern for an observable based on its type and value.
Args:
observable (Observable): The observable to generate a pattern for.
Returns:
Dict[str, Any]: A dictionary containing pattern and pattern_type.
"""
pattern_fields = {}
if hasattr(observable, "type") and observable.type:
observable_type_short_name = observable.type.short_name.lower()
try:
pattern_template = self.mapping_loader.get_pattern_template(observable_type_short_name)
pattern_type = self.mapping_loader.get_pattern_type(observable_type_short_name)
if pattern_template and observable.name:
pattern_fields["pattern"] = pattern_template.format(value=observable.name)
pattern_fields["pattern_type"] = pattern_type
except ValueError:
# If the observable type is not found in the mapping, use a generic pattern
pattern_fields["pattern"] = f"[unknown:value = '{observable.name}']"
pattern_fields["pattern_type"] = "stix"
return pattern_fields
def _convert_from_observable(self, observable: Observable) -> Dict[str, Any]:
"""
Convert a Colander Observable entity to a STIX2 object.
Args:
observable (Observable): The Colander Observable entity to convert.
Returns:
Dict[str, Any]: The converted STIX2 object.
"""
# Generate pattern fields for the observable
pattern_fields = self._generate_observable_pattern(observable)
# Add indicator_types to the additional fields
additional_fields = {"indicator_types": ["malicious-activity"]}
additional_fields.update(pattern_fields)
return self._convert_from_entity_by_type(observable, "observable", additional_fields)
def _get_threat_malware_types(self, threat: Threat) -> Dict[str, Any]:
"""
Get the malware types for a threat based on its type.
Args:
threat (Threat): The threat to get malware types for.
Returns:
Dict[str, Any]: A dictionary containing malware_types.
"""
additional_fields = {}
# Get the STIX2 type for threats
stix2_type = self.mapping_loader.get_stix2_type_for_entity("threat")
# Add malware_types if the type is malware
if stix2_type == "malware" and hasattr(threat, "type") and threat.type:
threat_type_short_name = threat.type.short_name.lower()
try:
malware_types = self.mapping_loader.get_malware_types_for_threat(threat_type_short_name)
if malware_types:
additional_fields["malware_types"] = malware_types
else:
additional_fields["malware_types"] = "unknown"
except ValueError:
additional_fields["malware_types"] = "unknown"
return additional_fields
def _convert_from_threat(self, threat: Threat) -> Dict[str, Any]:
"""
Convert a Colander Threat entity to a STIX2 object.
Args:
threat (Threat): The Colander Threat entity to convert.
Returns:
Dict[str, Any]: The converted STIX2 object.
"""
# Get additional fields for the threat
additional_fields = self._get_threat_malware_types(threat)
return self._convert_from_entity_by_type(threat, "threat", additional_fields)
def _extract_object_reference_relationships(self, entity: Any) -> list:
"""
Extract and create STIX2 relationship objects from ObjectReference attributes in a Colander entity.
Args:
entity (Any): The Colander entity to extract relationships from.
Returns:
list: A list of STIX2 relationship objects.
"""
from typing import get_args
from colander_data_converter.base.common import ObjectReference
from uuid import UUID
relationships = []
# Get the entity's STIX2 type
entity_type = None
if isinstance(entity, Actor):
entity_type = "actor"
elif isinstance(entity, Device):
entity_type = "device"
elif isinstance(entity, Artifact):
entity_type = "artifact"
elif isinstance(entity, Observable):
entity_type = "observable"
elif isinstance(entity, Threat):
entity_type = "threat"
if not entity_type:
return relationships
stix2_type = self.mapping_loader.get_stix2_type_for_entity(entity_type)
if not stix2_type:
return relationships
# Inspect the entity's fields for ObjectReference attributes
for field_name, field_info in entity.__class__.model_fields.items():
annotation_args = get_args(field_info.annotation)
# Check if this field is an ObjectReference
if ObjectReference in annotation_args:
ref_value = getattr(entity, field_name, None)
if ref_value and isinstance(ref_value, UUID):
# Create a relationship based on the field name
relationship_type = self._determine_relationship_type(field_name)
# Get the target entity type
target_entity = ColanderRepository() >> ref_value
if target_entity and not isinstance(target_entity, UUID):
target_type = self._get_entity_stix2_type(target_entity)
if target_type:
relationship = {
"type": "relationship",
"id": f"relationship--{uuid4()}",
"created": datetime.now(UTC).isoformat(),
"modified": datetime.now(UTC).isoformat(),
"relationship_type": relationship_type,
"source_ref": f"{stix2_type}--{entity.id}",
"target_ref": f"{target_type}--{ref_value}",
}
relationships.append(relationship)
# Check if this field is a List[ObjectReference]
elif any(
hasattr(arg, "__origin__") and arg.__origin__ is list and ObjectReference in get_args(arg)
for arg in annotation_args
):
ref_values = getattr(entity, field_name, [])
if ref_values and isinstance(ref_values, list):
relationship_type = self._determine_relationship_type(field_name)
for ref_value in ref_values:
if isinstance(ref_value, UUID):
# Get the target entity type
target_entity = ColanderRepository() >> ref_value
if target_entity and not isinstance(target_entity, UUID):
target_type = self._get_entity_stix2_type(target_entity)
if target_type:
relationship = {
"type": "relationship",
"id": f"relationship--{uuid4()}",
"created": datetime.now(UTC).isoformat(),
"modified": datetime.now(UTC).isoformat(),
"relationship_type": relationship_type,
"source_ref": f"{stix2_type}--{entity.id}",
"target_ref": f"{target_type}--{ref_value}",
}
relationships.append(relationship)
return relationships
def _determine_relationship_type(self, field_name: str) -> str:
"""
Determine the STIX2 relationship type based on the field name.
Args:
field_name (str): The name of the field.
Returns:
str: The STIX2 relationship type.
"""
return self.mapping_loader.get_field_relationship_type(field_name)
def _get_entity_stix2_type(self, entity: Any) -> Optional[str]:
"""
Get the STIX2 type for a Colander entity.
Args:
entity (Any): The Colander entity.
Returns:
Optional[str]: The STIX2 type, or None if not found.
"""
if isinstance(entity, Actor):
return self.mapping_loader.get_stix2_type_for_entity("actor")
elif isinstance(entity, Device):
return self.mapping_loader.get_stix2_type_for_entity("device")
elif isinstance(entity, Artifact):
return self.mapping_loader.get_stix2_type_for_entity("artifact")
elif isinstance(entity, Observable):
return self.mapping_loader.get_stix2_type_for_entity("observable")
elif isinstance(entity, Threat):
return self.mapping_loader.get_stix2_type_for_entity("threat")
return None
def _convert_from_relation(self, relation: EntityRelation) -> Optional[Dict[str, Any]]:
"""
Convert a Colander EntityRelation to a STIX2 relationship object.
Args:
relation (EntityRelation): The Colander EntityRelation to convert.
Returns:
Optional[Dict[str, Any]]: The converted STIX2 relationship object, or None if the relation cannot be converted.
"""
if not relation.obj_from or not relation.obj_to:
return None
# Create the base STIX2 relationship object
stix2_object = {
"type": "relationship",
"id": f"relationship--{relation.id}",
"created": relation.created_at.isoformat()
if hasattr(relation, "created_at") and relation.created_at
else datetime.now(UTC).isoformat(),
"modified": relation.updated_at.isoformat()
if hasattr(relation, "updated_at") and relation.updated_at
else datetime.now(UTC).isoformat(),
"source_ref": f"unknown--{relation.source_id}", # ToDo: placeholder, will be updated if source entity is found
"target_ref": f"unknown--{relation.target_id}", # ToDo: placeholder, will be updated if target entity is found
}
# Add any additional attributes
if hasattr(relation, "attributes") and relation.attributes:
for key, value in relation.attributes.items():
stix2_object[key] = value
return stix2_object