Source code for colander_data_converter.base.utils

  1import enum
  2from typing import get_args, List, Any, Optional, Dict
  3
  4from pydantic import BaseModel, UUID4
  5
  6from colander_data_converter.base.common import ObjectReference
  7from colander_data_converter.base.models import ColanderFeed, Entity, EntityRelation
  8
  9
[docs] 10class MergingStrategy(str, enum.Enum): 11 PRESERVE = "preserve" 12 OVERWRITE = "overwrite"
13 14
[docs] 15class BaseModelMerger: 16 """ 17 A utility class for merging :py:class:`pydantic.BaseModel` instances with configurable strategies. 18 19 This class provides functionality to merge fields from a source BaseModel into a 20 destination BaseModel, handling both regular model fields and extra attributes. 21 Fields containing `ObjectReference` types are automatically 22 excluded from merging and reported as unprocessed. 23 24 The merger supports two strategies: 25 26 - ``PRESERVE``: Only merge fields if the destination field is empty or `None` 27 - ``OVERWRITE``: Always merge fields from source to destination 28 29 Fields are merged based on type compatibility and field constraints. Extra 30 attributes are automatically converted to strings when stored in the attribute 31 dictionary (if supported by the destination model). 32 33 Example: 34 >>> from pydantic import BaseModel 35 >>> class SourceModel(BaseModel): 36 ... name: str 37 ... age: int 38 ... attributes: dict = {} 39 >>> class DestinationModel(BaseModel): 40 ... name: str 41 ... age: int 42 ... city: str = "Unknown" 43 ... attributes: dict = {} 44 >>> source = SourceModel(name="Alice", age=30) 45 >>> destination = DestinationModel(name="Bob", age=25) 46 >>> merger = BaseModelMerger(strategy=MergingStrategy.OVERWRITE) 47 >>> unprocessed = merger.merge(source, destination) 48 >>> print(destination.name) 49 Alice 50 >>> print(destination.age) 51 30 52 >>> print(destination.city) 53 Unknown 54 55 Note: 56 - Fields with ``ObjectReference`` types are never merged and are reported as unprocessed 57 - Frozen fields cannot be modified and will be reported as unprocessed 58 - Complex types (list, dict, tuple, set) in extra attributes are not supported 59 - Extra attributes are converted to strings when stored 60 """ 61
[docs] 62 def __init__(self, strategy: MergingStrategy = MergingStrategy.OVERWRITE): 63 """Initialize the ``BaseModelMerger`` with a merging strategy. 64 65 Args: 66 strategy: The strategy to use when merging fields. 67 """ 68 self.strategy = strategy
69
[docs] 70 def merge_field( 71 self, destination: BaseModel, field_name: str, field_value: Any, ignored_fields: Optional[List[str]] = None 72 ) -> bool: 73 """Merge a single field from source to destination model. 74 75 This method handles the logic for merging individual fields, including 76 type checking, field existence validation, and attribute handling. It 77 processes both regular model fields and extra attributes based on the 78 destination model's capabilities and field constraints. 79 80 Note: 81 The method follows these rules: 82 83 - Skips fields listed in ignored_fields 84 - Skips empty/None field values 85 - For fields not in the destination model schema: stores as string in 86 attributes dict (if supported) unless the value is a complex type 87 - For schema fields: merges only if type-compatible, not frozen, not 88 containing ObjectReference, and destination is empty (``PRESERVE``) or 89 strategy is ``OVERWRITE`` 90 91 Args: 92 destination: The target model to merge into. 93 field_name: The name of the field to merge. 94 field_value: The value to merge from the source. 95 ignored_fields: List of field names to skip during merging. 96 97 Returns: 98 True if the field was processed (successfully merged or handled), 99 False if the field could not be processed 100 """ 101 field_processed = False 102 if not field_value: 103 return field_processed 104 if not ignored_fields: 105 ignored_fields = [] 106 extra_attributes_supported = hasattr(destination, "attributes") 107 source_field_value = field_value 108 source_field_value_type = type(field_value) 109 if field_name in ignored_fields: 110 return field_processed 111 # Append in extra attribute dict if supported 112 if ( 113 field_name not in destination.__class__.model_fields 114 and extra_attributes_supported 115 and source_field_value_type not in [list, dict, tuple, set, ObjectReference] 116 and not isinstance(source_field_value, BaseModel) 117 ): 118 destination.attributes[field_name] = str(source_field_value) 119 field_processed = True 120 elif field_name in destination.__class__.model_fields: 121 field_info = destination.__class__.model_fields[field_name] 122 annotation_args = get_args(field_info.annotation) or [] # type: ignore[var-annotated] 123 if ( 124 ObjectReference not in annotation_args 125 and List[ObjectReference] not in annotation_args 126 and not field_info.frozen 127 and (not getattr(destination, field_name, None) or self.strategy == MergingStrategy.OVERWRITE) 128 and (source_field_value_type is field_info.annotation or source_field_value_type in annotation_args) 129 ): 130 setattr(destination, field_name, source_field_value) 131 field_processed = True 132 return field_processed
133
[docs] 134 def merge(self, source: BaseModel, destination: BaseModel, ignored_fields: Optional[List[str]] = None) -> List[str]: 135 """Merge all compatible fields from the source object into the destination object. 136 137 This method iterates through all fields in the source object and attempts 138 to merge them into the destination object. It handles both regular object 139 fields and extra attributes dictionary if supported. 140 141 Args: 142 source: The source model to merge from 143 destination: The destination model to merge to 144 ignored_fields: List of field names to skip during merging 145 146 Returns: 147 A list of field names that could not be processed during 148 the merge operation. Fields containing ObjectReference types 149 are automatically added to this list. 150 """ 151 unprocessed_fields = [] 152 source_attributes = getattr(source, "attributes", None) 153 destination_attributes = getattr(destination, "attributes", None) 154 155 if destination_attributes is None and hasattr(destination, "attributes"): 156 destination.attributes = {} 157 158 # Merge model fields 159 for field_name, field_info in source.__class__.model_fields.items(): 160 source_field_value = getattr(source, field_name, None) 161 if ObjectReference in get_args(field_info.annotation): 162 unprocessed_fields.append(field_name) 163 elif not self.merge_field(destination, field_name, source_field_value, ignored_fields): 164 unprocessed_fields.append(field_name) 165 166 # Merge extra attributes 167 if source_attributes: 168 for name, value in source_attributes.items(): 169 if not self.merge_field(destination, name, value): 170 unprocessed_fields.append(f"attributes.{name}") 171 172 return unprocessed_fields
173 174
[docs] 175class FeedMerger: 176 def __init__(self, source_feed: ColanderFeed, destination_feed: ColanderFeed): 177 self.source_feed = source_feed 178 self.destination_feed = destination_feed 179 self.id_rewrite: Dict[UUID4, UUID4] = {} # source, destination 180 self.merging_candidates: Dict[Entity, Entity] = {} # source, destination 181 self.added_entities: List[Entity] = [] # source added to the destination feed 182 self.merged_entities: Dict[Entity, Entity] = {} # source, destination 183
[docs] 184 def merge(self, delete_unlinked: bool = False, aggressive: bool = False): 185 """ 186 Merge the source feed into the destination feed. 187 This method performs a comprehensive merge operation between two ColanderFeeds, 188 handling entity merging, relation updates, and immutable relation management. 189 The merge process follows these main steps: 190 191 1. Identify merging candidates based on entity similarity 192 2. Merge compatible entities or add new ones to the destination feed 193 3. Update immutable relation references after merging 194 4. Copy non-immutable relations from source to destination 195 196 Args: 197 delete_unlinked: If True, delete relations involving missing entities 198 aggressive: If True, temporarily breaks and rebuilds immutable relations 199 to allow more flexible merging. Use with caution as this may 200 affect data integrity during the merge process. 201 """ 202 model_merger = BaseModelMerger(strategy=MergingStrategy.PRESERVE) 203 204 if aggressive: 205 self.source_feed.break_immutable_relations() 206 self.destination_feed.break_immutable_relations() 207 208 # Identify merging candidates or add missing source entities to the destination feed 209 for _, source_entity in self.source_feed.entities.items(): 210 destination_candidates = self.destination_feed.get_entities_similar_to(source_entity) 211 # Multiple or no candidates found or multiple immutable relations, add to the destination feed 212 if len(destination_candidates) != 1 or len(source_entity.get_immutable_relations()) > 0: 213 self.destination_feed.entities[str(source_entity.id)] = source_entity 214 self.id_rewrite[source_entity.id] = source_entity.id 215 self.added_entities.append(source_entity) 216 else: 217 # Only one candidate found 218 _, destination_candidate = destination_candidates.popitem() 219 # Candidate has no immutable relations, merge 220 if len(source_entity.get_immutable_relations()) == 0: 221 model_merger.merge(source_entity, destination_candidate) 222 destination_candidate.touch() 223 self.id_rewrite[source_entity.id] = destination_candidate.id 224 self.merged_entities[source_entity] = destination_candidate 225 else: 226 self.merging_candidates[source_entity] = destination_candidate 227 228 for destination_entity in self.destination_feed.entities.values(): 229 for _, immutable_relation in destination_entity.get_immutable_relations().items(): 230 # The relation destination entity is missing: add it to the destination feed 231 if ( 232 immutable_relation.obj_to not in self.merged_entities 233 and immutable_relation.obj_to not in self.added_entities 234 ): 235 self.destination_feed.entities[str(immutable_relation.obj_to.id)] = immutable_relation.obj_to 236 self.added_entities.append(immutable_relation.obj_to) 237 # The relation obj_to has been merged: update the reference 238 elif immutable_relation.obj_to in self.merged_entities: 239 original_obj_to = immutable_relation.obj_to 240 merged_obj_to = self.merged_entities[original_obj_to] 241 object_reference = getattr(destination_entity, immutable_relation.name) 242 if not object_reference: 243 continue 244 if isinstance(object_reference, list): 245 object_reference.remove(original_obj_to) 246 object_reference.append(merged_obj_to) 247 else: 248 setattr(destination_entity, immutable_relation.name, merged_obj_to) 249 250 for _, source_relation in self.source_feed.relations.items(): 251 obj_from: Entity = self.merged_entities.get(source_relation.obj_from, source_relation.obj_from) 252 obj_to: Entity = self.merged_entities.get(source_relation.obj_to, source_relation.obj_to) 253 relation_exists = False 254 for _, relation in self.destination_feed.get_outgoing_relations(obj_from, exclude_immutables=True).items(): 255 if relation.obj_to == obj_to and relation.name == source_relation.name: 256 relation_exists = True 257 if not relation_exists: 258 self.destination_feed.relations[str(source_relation.id)] = EntityRelation( 259 id=source_relation.id, 260 name=source_relation.name, 261 obj_from=obj_from, 262 obj_to=obj_to, 263 ) 264 265 unlinked_relations: List[str] = [] 266 for _, relation in self.destination_feed.relations.items(): 267 obj_from = relation.obj_from 268 obj_to = relation.obj_to 269 if not self.destination_feed.contains(obj_from) or not self.destination_feed.contains(obj_to): 270 unlinked_relations.append(str(relation.id)) 271 272 if delete_unlinked: 273 for relation_id in unlinked_relations: 274 self.destination_feed.relations.pop(relation_id) 275 elif unlinked_relations: 276 raise Exception(f"{len(unlinked_relations)} unlinked relation detected") 277 278 if aggressive: 279 self.source_feed.rebuild_immutable_relations() 280 self.destination_feed.rebuild_immutable_relations()