Source code for colander_data_converter.converters.stix2.converter

  1from typing import Dict, Optional, Union, List, Type, Any
  2from uuid import uuid4
  3
  4from colander_data_converter.base.models import (
  5    Actor,
  6    Device,
  7    Artifact,
  8    Observable,
  9    Threat,
 10    Event,
 11    DetectionRule,
 12    DataFragment,
 13    EntityRelation,
 14    ColanderFeed,
 15    ColanderRepository,
 16    CommonEntitySuperType,
 17    CommonEntitySuperTypes,
 18    EntityTypes,
 19    Entity,
 20)
 21from colander_data_converter.base.types.actor import *
 22from colander_data_converter.base.types.artifact import *
 23from colander_data_converter.base.types.device import *
 24from colander_data_converter.base.types.observable import *
 25from colander_data_converter.base.types.threat import *
 26from colander_data_converter.converters.stix2.mapping import Stix2MappingLoader
 27from colander_data_converter.converters.stix2.models import (
 28    Stix2ObjectBase,
 29    Stix2ObjectTypes,
 30    Stix2Bundle,
 31    Stix2Repository,
 32    Relationship,
 33)
 34from colander_data_converter.converters.stix2.utils import (
 35    extract_uuid_from_stix2_id,
 36    get_nested_value,
 37    set_nested_value,
 38    extract_stix2_pattern_name,
 39    extract_stix2_pattern_value,
 40)
 41
 42
[docs] 43class Stix2Mapper: 44 """ 45 Base class for mapping between STIX2 and Colander data using the mapping file. 46 """ 47
[docs] 48 def __init__(self): 49 """ 50 Initialize the mapper. 51 """ 52 self.mapping_loader = Stix2MappingLoader() 53 ColanderRepository().clear()
54 55
[docs] 56class Stix2ToColanderMapper(Stix2Mapper): 57 """ 58 Maps STIX2 data to Colander data using the mapping file. 59 """ 60
[docs] 61 def convert(self, stix2_data: Dict[str, Any]) -> ColanderFeed: 62 """ 63 Convert STIX2 data to Colander data. 64 65 Args: 66 stix2_data (Dict[str, Any]): The STIX2 data to convert. 67 68 Returns: 69 ColanderFeed: The converted Colander data. 70 """ 71 repository = ColanderRepository() 72 73 # Keep track of processed STIX2 object IDs to handle duplicates 74 processed_ids: Dict[str, str] = {} 75 76 # Process STIX2 objects 77 for stix2_object in stix2_data.get("objects", []): 78 stix2_id = stix2_object.get("id", "") 79 stix2_type = stix2_object.get("type", "") 80 81 # Skip if this ID has already been processed with a different type 82 if stix2_id in processed_ids and processed_ids[stix2_id] != stix2_type: 83 # Generate a new UUID for this object to avoid overwriting 84 stix2_object = stix2_object.copy() 85 stix2_object["id"] = f"{stix2_type}--{uuid4()}" 86 87 colander_entity = self.convert_stix2_object(stix2_object) 88 if colander_entity: 89 repository << colander_entity 90 processed_ids[stix2_id] = stix2_type 91 92 # Handle object references 93 for stix2_object in stix2_data.get("objects", []): 94 stix2_id = stix2_object.get("id", "") 95 if stix2_id not in processed_ids: 96 continue 97 stix2_type = stix2_object.get("type", "") 98 if stix2_type == "relationship": 99 continue 100 for attr, value in stix2_object.items(): 101 if attr.endswith("_ref"): 102 self._convert_reference(attr, stix2_id, value) 103 elif attr.endswith("_refs"): 104 for ref in stix2_object.get("refs", []): 105 self._convert_reference(attr, stix2_id, ref) 106 107 bundle_id = extract_uuid_from_stix2_id(stix2_data.get("id", "")) 108 109 feed_data = { 110 "id": bundle_id, 111 "name": stix2_data.get("name", "STIX2 Feed"), 112 "description": stix2_data.get("description", "Converted from STIX2"), 113 "entities": repository.entities, 114 "relations": repository.relations, 115 } 116 117 return ColanderFeed.model_validate(feed_data)
118 119 def _convert_reference(self, name: str, source_id: str, target_id: str) -> Optional[EntityRelation]: 120 if not name or not source_id or not target_id: 121 return None 122 relation_name = name.replace("_refs", "").replace("_ref", "").replace("_", " ") 123 source_object_id = extract_uuid_from_stix2_id(source_id) 124 target_object_id = extract_uuid_from_stix2_id(target_id) 125 source = ColanderRepository() >> source_object_id 126 target = ColanderRepository() >> target_object_id 127 if not source or not target: 128 return None 129 relation = EntityRelation( 130 name=relation_name, 131 obj_from=source, 132 obj_to=target, 133 ) 134 ColanderRepository() << relation 135 return relation 136
[docs] 137 def convert_stix2_object( 138 self, stix2_object: Dict[str, Any] 139 ) -> Optional[ 140 Union[Actor, Device, Artifact, Observable, Threat, Event, DetectionRule, DataFragment, EntityRelation] 141 ]: 142 """ 143 Convert a STIX2 object to a Colander entity. 144 145 Args: 146 stix2_object (Dict[str, Any]): The STIX2 object to convert. 147 148 Returns: 149 Optional[Union[Actor, Device, Artifact, Observable, Threat, Event, DetectionRule, DataFragment, EntityRelation]]: 150 The converted Colander entity, or None if the object type is not supported. 151 """ 152 stix2_type = stix2_object.get("type", "") 153 154 # Get the Colander entity type for this STIX2 type 155 entity_type, entity_subtype_candidates = self.mapping_loader.get_entity_type_for_stix2(stix2_type) 156 157 if entity_type and entity_subtype_candidates: 158 # Use the appropriate conversion method based on the entity type 159 if entity_type == "actor": 160 return self._convert_to_actor(stix2_object, entity_subtype_candidates) 161 elif entity_type == "device": 162 return self._convert_to_device(stix2_object, entity_subtype_candidates) 163 elif entity_type == "artifact": 164 return self._convert_to_artifact(stix2_object, entity_subtype_candidates) 165 elif entity_type == "observable": 166 return self._convert_to_observable(stix2_object, entity_subtype_candidates) 167 elif entity_type == "threat": 168 return self._convert_to_threat(stix2_object, entity_subtype_candidates) 169 170 # Handle relationship objects 171 if stix2_type == "relationship": 172 return self._convert_to_relation(stix2_object) 173 174 return None
175 176 def _convert_to_entity( 177 self, 178 stix2_object: Dict[str, Any], 179 model_class: Type["EntityTypes"], 180 colander_entity_type, 181 default_name: str = "Unknown Entity", 182 ) -> Any: 183 # Get the field mapping for the entity type 184 colander_entity_super_type: CommonEntitySuperType = CommonEntitySuperTypes.by_short_name(model_class.__name__) 185 field_mapping = self.mapping_loader.get_stix2_to_colander_field_mapping(model_class.__name__) 186 187 if not colander_entity_type: 188 raise ValueError("Invalid entity type") 189 190 # Create the base entity data 191 stix2_id = stix2_object.get("id", "") 192 extracted_uuid = extract_uuid_from_stix2_id(stix2_id) 193 entity_data = { 194 "id": extracted_uuid, 195 "name": stix2_object.get("name", default_name), 196 "description": stix2_object.get("description", ""), 197 "super_type": colander_entity_super_type, 198 "type": colander_entity_type, 199 "attributes": {}, 200 } 201 202 # Apply the field mapping 203 for stix2_field, colander_field in field_mapping.items(): 204 value = get_nested_value(stix2_object, stix2_field) 205 if value is not None: 206 if "." in colander_field: 207 # Handle nested fields 208 set_nested_value(entity_data, colander_field, value) 209 else: 210 entity_data[colander_field] = value 211 212 # Add any additional attributes from the STIX2 object 213 _ignore = ["id", "type"] 214 for key, value in stix2_object.items(): 215 if key not in field_mapping and key not in _ignore and isinstance(value, (str, int, float, bool)): 216 entity_data["attributes"][key] = str(value) 217 218 try: 219 return model_class.model_validate(entity_data) 220 except Exception as e: 221 raise e 222 223 def _get_actor_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str: 224 default_type = ArtifactTypes.default.value.short_name.lower() 225 if not subtype_candidates: 226 return default_type 227 228 if stix2_object.get("type", "") == "threat-actor": 229 default_type = "threat_actor" 230 for subtype_candidate in subtype_candidates: 231 if subtype_candidate.lower() in stix2_object.get("threat_actor_types", []): 232 return subtype_candidate 233 return default_type 234 235 if len(subtype_candidates) == 1: 236 return subtype_candidates[0] 237 238 return default_type 239 240 def _convert_to_actor(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Actor: 241 _stix2_object = stix2_object.copy() 242 if "threat_actor_types" in _stix2_object and _stix2_object["threat_actor_types"] is not None: 243 _stix2_object["threat_actor_types"] = ",".join(_stix2_object["threat_actor_types"]) 244 245 _actor_type = self._get_actor_type(_stix2_object, subtype_candidates) 246 247 return self._convert_to_entity( 248 stix2_object=_stix2_object, 249 model_class=Actor, 250 colander_entity_type=ActorTypes.by_short_name(_actor_type), 251 ) 252 253 def _get_device_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str: 254 default_type = DeviceTypes.default.value.short_name.lower() 255 if not subtype_candidates: 256 return default_type 257 258 if len(subtype_candidates) == 1: 259 return subtype_candidates[0] 260 261 for subtype_candidate in subtype_candidates: 262 if subtype_candidate.lower() in stix2_object.get("infrastructure_types", []): 263 return subtype_candidate 264 265 return default_type 266 267 def _convert_to_device(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Device: 268 _stix2_object = stix2_object.copy() 269 if "infrastructure_types" in _stix2_object and _stix2_object["infrastructure_types"] is not None: 270 _stix2_object["infrastructure_types"] = ",".join(_stix2_object["infrastructure_types"]) 271 272 _device_type = self._get_device_type(_stix2_object, subtype_candidates) 273 return self._convert_to_entity( 274 stix2_object=_stix2_object, 275 model_class=Device, 276 colander_entity_type=DeviceTypes.by_short_name(_device_type), 277 ) 278 279 def _get_artifact_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str: 280 default_type = ArtifactTypes.default.value.short_name.lower() 281 if not subtype_candidates: 282 return default_type 283 284 artifact_type = ArtifactTypes.by_mime_type(stix2_object.get("mime_type", "unspecified")).short_name 285 286 return artifact_type or default_type 287 288 def _convert_to_artifact(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Artifact: 289 _artifact_type = self._get_artifact_type(stix2_object, subtype_candidates) 290 291 return self._convert_to_entity( 292 stix2_object=stix2_object, 293 model_class=Artifact, 294 colander_entity_type=ArtifactTypes.by_short_name(_artifact_type), 295 ) 296 297 def _get_observable_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str: 298 default_type = ObservableTypes.default.value.short_name.lower() 299 if not subtype_candidates: 300 return default_type 301 302 _pattern_name = extract_stix2_pattern_name(stix2_object.get("pattern", "")) or "unspecified" 303 for _candidate in subtype_candidates: 304 _mapping = self.mapping_loader.get_entity_subtype_mapping("observable", _candidate) 305 if _pattern_name in _mapping["pattern"]: 306 return _candidate 307 308 # Return the generic subtype as it was not possible to narrow down the type selection 309 return default_type 310 311 def _convert_to_observable( 312 self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]] 313 ) -> Observable: 314 _observable_type = self._get_observable_type(stix2_object, subtype_candidates) 315 # Use the generic conversion method 316 observable = self._convert_to_entity( 317 stix2_object=stix2_object, 318 model_class=Observable, 319 colander_entity_type=ObservableTypes.by_short_name(_observable_type), 320 ) 321 # Extract value from pattern 322 pattern = stix2_object.get("pattern", "") 323 extracted_value = extract_stix2_pattern_value(pattern) 324 if extracted_value: 325 observable.name = extracted_value 326 return observable 327 328 def _get_threat_type(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> str: 329 default_type = ThreatTypes.default.value.short_name.lower() 330 if not subtype_candidates: 331 return default_type 332 333 for _candidate in subtype_candidates: 334 if _candidate in stix2_object.get("malware_types", []): 335 return _candidate 336 337 # Return the generic subtype as it was not possible to narrow down the type selection 338 return default_type 339 340 def _convert_to_threat(self, stix2_object: Dict[str, Any], subtype_candidates: Optional[List[str]]) -> Threat: 341 _threat_type = self._get_threat_type(stix2_object, subtype_candidates) 342 # Use the generic conversion method 343 return self._convert_to_entity( 344 stix2_object=stix2_object, 345 model_class=Threat, 346 colander_entity_type=ThreatTypes.by_short_name(_threat_type), 347 ) 348 349 def _convert_to_relation(self, stix2_object: Dict[str, Any]) -> Optional[EntityRelation]: 350 relationship_type = stix2_object.get("relationship_type", "") 351 source_ref = stix2_object.get("source_ref", "") 352 target_ref = stix2_object.get("target_ref", "") 353 354 if not relationship_type or not source_ref or not target_ref: 355 return None 356 357 # Extract UUIDs from the references 358 source_id = extract_uuid_from_stix2_id(source_ref) 359 target_id = extract_uuid_from_stix2_id(target_ref) 360 361 if not source_id or not target_id: 362 return None 363 364 # Create the relation data 365 relation_data = { 366 "id": extract_uuid_from_stix2_id(stix2_object.get("id", "")), 367 "name": stix2_object.get("name", relationship_type), 368 "description": stix2_object.get("description", ""), 369 "created_at": stix2_object.get("created"), 370 "updated_at": stix2_object.get("modified"), 371 "obj_from": source_id, 372 "obj_to": target_id, 373 "attributes": {}, 374 } 375 376 # Add any additional attributes from the STIX2 object 377 for key, value in stix2_object.items(): 378 if key not in [ 379 "id", 380 "type", 381 "name", 382 "description", 383 "created", 384 "modified", 385 "source_ref", 386 "target_ref", 387 ] and isinstance(value, (str, int, float, bool)): 388 relation_data["attributes"][key] = str(value) 389 390 return EntityRelation.model_validate(relation_data)
391 392
[docs] 393class ColanderToStix2Mapper(Stix2Mapper): 394 """ 395 Maps Colander data to STIX2 data using the mapping file. 396 """ 397
[docs] 398 def convert(self, colander_feed: ColanderFeed) -> Stix2Bundle: 399 stix2_data = { 400 "type": "bundle", 401 "id": f"bundle--{colander_feed.id or uuid4()}", 402 "spec_version": "2.1", 403 "objects": [], 404 } 405 406 # Convert entities 407 for _, entity in colander_feed.entities.items(): 408 if not issubclass(entity.__class__, Entity): 409 continue 410 if entity.super_type.short_name.lower() not in self.mapping_loader.get_supported_colander_types(): 411 continue 412 stix2_object = self.convert_colander_entity(entity) 413 if stix2_object: 414 stix2_data["objects"].append(stix2_object) 415 416 bundle = Stix2Bundle(**stix2_data) 417 418 # Extract and convert immutable relations 419 for _, entity in colander_feed.entities.items(): 420 if not issubclass(entity.__class__, Entity): 421 continue 422 if entity.super_type.short_name.lower() not in self.mapping_loader.get_supported_colander_types(): 423 continue 424 for _, relation in entity.get_immutable_relations( 425 mapping=self.mapping_loader.get_field_relationship_mapping(), default_name="related-to" 426 ).items(): 427 stix2_object = self.convert_colander_relation(relation) 428 if stix2_object: 429 bundle.objects.append(Relationship(**stix2_object)) 430 431 # Convert relations 432 for relation_id, relation in colander_feed.relations.items(): 433 if isinstance(relation, EntityRelation): 434 stix2_object = self.convert_colander_relation(relation) 435 if stix2_object: 436 bundle.objects.append(Relationship(**stix2_object)) 437 438 return bundle
439
[docs] 440 def convert_colander_entity( 441 self, entity: Union[Actor, Device, Artifact, Observable, Threat] 442 ) -> Optional[Dict[str, Any]]: 443 """ 444 Convert a Colander entity to a STIX2 object. 445 446 Args: 447 entity: The Colander entity to convert. 448 449 Returns: 450 Optional[Dict[str, Any]]: The converted STIX2 object, or None if the entity type is not supported. 451 """ 452 if isinstance(entity, Actor): 453 return self._convert_from_actor(entity) 454 elif isinstance(entity, Device): 455 return self._convert_from_device(entity) 456 elif isinstance(entity, Artifact): 457 return self._convert_from_artifact(entity) 458 elif isinstance(entity, Observable): 459 return self._convert_from_observable(entity) 460 elif isinstance(entity, Threat): 461 return self._convert_from_threat(entity) 462 463 return None
464
[docs] 465 def convert_colander_relation(self, relation: EntityRelation) -> Optional[Dict[str, Any]]: 466 """ 467 Convert a Colander EntityRelation to a STIX2 relationship object. 468 469 Args: 470 relation (~colander_data_converter.base.models.EntityRelation): The Colander EntityRelation to convert. 471 472 Returns: 473 Optional[Dict[str, Any]]: The converted STIX2 relationship object, or None if the relation cannot be 474 converted. 475 """ 476 return self._convert_from_relation(relation)
477 478 def _convert_from_entity( 479 self, entity: Any, additional_fields: Optional[Dict[str, Any]] = None 480 ) -> Optional[Stix2ObjectTypes]: 481 # Get the STIX2 type for the entity 482 stix2_type = self.mapping_loader.get_stix2_type_for_entity(entity) 483 if not stix2_type or (model_class := Stix2ObjectBase.get_model_class(stix2_type)) is None: 484 return None 485 486 # Get the field mapping for the entity type 487 entity_type = entity.get_super_type().short_name 488 field_mapping = self.mapping_loader.get_colander_to_stix2_field_mapping(entity_type) 489 490 # Create the base STIX2 object 491 stix2_object = { 492 "type": stix2_type, 493 "id": f"{stix2_type}--{entity.id}", 494 "created": entity.created_at.isoformat(), 495 "modified": entity.updated_at.isoformat(), 496 } 497 498 if "name" in model_class.model_fields: 499 stix2_object["name"] = entity.name 500 501 # Add any additional fields 502 if additional_fields: 503 stix2_object.update(additional_fields) 504 505 # Apply the field mapping 506 for colander_field, stix2_field in field_mapping.items(): 507 value = get_nested_value(entity.model_dump(), colander_field) 508 if value is not None: 509 if "." in stix2_field: 510 # Handle nested fields 511 set_nested_value(stix2_object, stix2_field, value) 512 else: 513 stix2_object[stix2_field] = value 514 515 # Add any additional attributes 516 if hasattr(entity, "attributes") and entity.attributes: 517 for key, value in entity.attributes.items(): 518 if key not in [field.split(".")[-1] for field in field_mapping.keys() if "." in field]: 519 stix2_object[key] = value 520 521 return model_class(**stix2_object) 522 523 def _convert_from_actor(self, actor: Actor) -> Optional[Dict[str, Any]]: 524 mapping = self.mapping_loader.get_actor_mapping(actor.type.short_name) 525 if not mapping: 526 return None 527 extra_attributes = self.mapping_loader.get_entity_extra_values("actor", actor.type.short_name) 528 529 return self._convert_from_entity(actor, extra_attributes) 530 531 def _convert_from_device(self, device: Device) -> Optional[Dict[str, Any]]: 532 mapping = self.mapping_loader.get_device_mapping(device.type.short_name) 533 if not mapping: 534 return None 535 extra_attributes = self.mapping_loader.get_entity_extra_values("device", device.type.short_name) 536 return self._convert_from_entity(device, extra_attributes) 537 538 def _convert_from_artifact(self, artifact: Artifact) -> Dict[str, Any]: 539 return self._convert_from_entity(artifact) 540 541 def _generate_observable_pattern(self, observable: Observable) -> Dict[str, Any]: 542 pattern_fields = {} 543 544 observable_type_short_name = observable.type.short_name.lower() 545 pattern_template = self.mapping_loader.get_observable_pattern(observable_type_short_name) 546 pattern_fields["pattern_type"] = "stix" 547 if pattern_template: 548 pattern_fields["pattern"] = pattern_template.format(value=observable.name) 549 # If the observable type is not found in the mapping, use a generic pattern 550 else: 551 pattern_fields["pattern"] = f"[unknown:value = '{observable.name}']" 552 553 return pattern_fields 554 555 def _convert_from_observable(self, observable: Observable) -> Dict[str, Any]: 556 # Generate pattern fields for the observable 557 pattern_fields = self._generate_observable_pattern(observable) 558 additional_fields = {} 559 additional_fields.update(pattern_fields) 560 561 # Add indicator_types to the additional fields 562 if observable.associated_threat is not None: 563 additional_fields.update({"indicator_types": ["malicious-activity"]}) 564 565 return self._convert_from_entity(observable, additional_fields) 566 567 def _get_threat_malware_types(self, threat: Threat) -> Dict[str, Any]: 568 additional_fields = {} 569 570 # Get the STIX2 type for threats 571 stix2_type = self.mapping_loader.get_stix2_type_for_entity(threat) 572 573 # Add malware_types if the type is malware 574 if stix2_type == "malware": 575 threat_type_short_name = threat.type.short_name.lower() 576 malware_types = self.mapping_loader.get_malware_types_for_threat(threat_type_short_name) 577 if malware_types: 578 additional_fields["malware_types"] = malware_types 579 else: 580 additional_fields["malware_types"] = ["unknown", threat_type_short_name] 581 582 return additional_fields 583 584 def _convert_from_threat(self, threat: Threat) -> Dict[str, Any]: 585 additional_fields = self._get_threat_malware_types(threat) 586 return self._convert_from_entity(threat, additional_fields) 587 588 def _convert_from_relation(self, relation: EntityRelation) -> Optional[Dict[str, Any]]: 589 if not relation.obj_from or not relation.obj_to: 590 return None 591 592 if not relation.is_fully_resolved(): 593 return None 594 595 supported_types = self.mapping_loader.get_supported_colander_types() 596 if ( 597 relation.obj_from.super_type.short_name.lower() not in supported_types 598 or relation.obj_to.super_type.short_name.lower() not in supported_types 599 ): 600 return None 601 602 source_prefix = self.mapping_loader.get_stix2_type_for_entity(relation.obj_from) or "unknown" 603 target_prefix = self.mapping_loader.get_stix2_type_for_entity(relation.obj_to) or "unknown" 604 source_ref = f"{source_prefix}--{relation.obj_from.id}" 605 target_ref = f"{target_prefix}--{relation.obj_to.id}" 606 repository = Stix2Repository() 607 source = repository >> source_ref 608 target = repository >> target_ref 609 610 if not source or not target: 611 return None 612 613 # Create the base STIX2 relationship object 614 stix2_object = { 615 "type": "relationship", 616 "id": f"relationship--{relation.id}", 617 "relationship_type": relation.name.replace(" ", "-"), 618 "created": relation.created_at.isoformat(), 619 "modified": relation.updated_at.isoformat(), 620 "source_ref": f"{source_prefix}--{relation.obj_from.id}", 621 "target_ref": f"{target_prefix}--{relation.obj_to.id}", 622 } 623 624 # Add any additional attributes 625 if hasattr(relation, "attributes") and relation.attributes: 626 for key, value in relation.attributes.items(): 627 stix2_object[key] = value 628 629 return stix2_object
630 631
[docs] 632class Stix2Converter: 633 """ 634 Converter for STIX2 data to Colander data and vice versa. 635 Uses the mapping file to convert between formats. 636 """ 637
[docs] 638 @staticmethod 639 def stix2_to_colander(stix2_data: Dict[str, Any]) -> ColanderFeed: 640 """ 641 Converts STIX2 data to Colander data using the mapping file. 642 643 Args: 644 stix2_data (Dict[str, Any]): The STIX2 data to convert. 645 646 Returns: 647 ColanderFeed: The converted Colander data. 648 """ 649 mapper = Stix2ToColanderMapper() 650 return mapper.convert(stix2_data)
651
[docs] 652 @staticmethod 653 def colander_to_stix2(colander_feed: ColanderFeed) -> Stix2Bundle: 654 """ 655 Converts Colander data to STIX2 data using the mapping file. 656 657 Args: 658 colander_feed (ColanderFeed): The Colander data to convert. 659 660 Returns: 661 Stix2Bundle: The converted STIX2 bundle. 662 """ 663 mapper = ColanderToStix2Mapper() 664 colander_feed.resolve_references() 665 return mapper.convert(colander_feed)