1import enum
2from typing import get_args, List, Any, Optional, Dict
3
4from pydantic import BaseModel, UUID4
5
6from colander_data_converter.base.common import ObjectReference
7from colander_data_converter.base.models import ColanderFeed, Entity, EntityRelation
8
9
[docs]
10class MergingStrategy(str, enum.Enum):
11 PRESERVE = "preserve"
12 OVERWRITE = "overwrite"
13
14
[docs]
15class BaseModelMerger:
16 """
17 A utility class for merging :py:class:`pydantic.BaseModel` instances with configurable strategies.
18
19 This class provides functionality to merge fields from a source BaseModel into a
20 destination BaseModel, handling both regular model fields and extra attributes.
21 Fields containing `ObjectReference` types are automatically
22 excluded from merging and reported as unprocessed.
23
24 The merger supports two strategies:
25
26 - ``PRESERVE``: Only merge fields if the destination field is empty or `None`
27 - ``OVERWRITE``: Always merge fields from source to destination
28
29 Fields are merged based on type compatibility and field constraints. Extra
30 attributes are automatically converted to strings when stored in the attribute
31 dictionary (if supported by the destination model).
32
33 Example:
34 >>> from pydantic import BaseModel
35 >>> class SourceModel(BaseModel):
36 ... name: str
37 ... age: int
38 ... attributes: dict = {}
39 >>> class DestinationModel(BaseModel):
40 ... name: str
41 ... age: int
42 ... city: str = "Unknown"
43 ... attributes: dict = {}
44 >>> source = SourceModel(name="Alice", age=30)
45 >>> destination = DestinationModel(name="Bob", age=25)
46 >>> merger = BaseModelMerger(strategy=MergingStrategy.OVERWRITE)
47 >>> unprocessed = merger.merge(source, destination)
48 >>> print(destination.name)
49 Alice
50 >>> print(destination.age)
51 30
52 >>> print(destination.city)
53 Unknown
54
55 Note:
56 - Fields with ``ObjectReference`` types are never merged and are reported as unprocessed
57 - Frozen fields cannot be modified and will be reported as unprocessed
58 - Complex types (list, dict, tuple, set) in extra attributes are not supported
59 - Extra attributes are converted to strings when stored
60 """
61
[docs]
62 def __init__(self, strategy: MergingStrategy = MergingStrategy.OVERWRITE):
63 """Initialize the ``BaseModelMerger`` with a merging strategy.
64
65 Args:
66 strategy: The strategy to use when merging fields.
67 """
68 self.strategy = strategy
69
[docs]
70 def merge_field(
71 self, destination: BaseModel, field_name: str, field_value: Any, ignored_fields: Optional[List[str]] = None
72 ) -> bool:
73 """Merge a single field from source to destination model.
74
75 This method handles the logic for merging individual fields, including
76 type checking, field existence validation, and attribute handling. It
77 processes both regular model fields and extra attributes based on the
78 destination model's capabilities and field constraints.
79
80 Note:
81 The method follows these rules:
82
83 - Skips fields listed in ignored_fields
84 - Skips empty/None field values
85 - For fields not in the destination model schema: stores as string in
86 attributes dict (if supported) unless the value is a complex type
87 - For schema fields: merges only if type-compatible, not frozen, not
88 containing ObjectReference, and destination is empty (``PRESERVE``) or
89 strategy is ``OVERWRITE``
90
91 Args:
92 destination: The target model to merge into.
93 field_name: The name of the field to merge.
94 field_value: The value to merge from the source.
95 ignored_fields: List of field names to skip during merging.
96
97 Returns:
98 True if the field was processed (successfully merged or handled),
99 False if the field could not be processed
100 """
101 field_processed = False
102 if not field_value:
103 return field_processed
104 if not ignored_fields:
105 ignored_fields = []
106 extra_attributes_supported = hasattr(destination, "attributes")
107 source_field_value = field_value
108 source_field_value_type = type(field_value)
109 if field_name in ignored_fields:
110 return field_processed
111 # Append in extra attribute dict if supported
112 if (
113 field_name not in destination.__class__.model_fields
114 and extra_attributes_supported
115 and source_field_value_type not in [list, dict, tuple, set, ObjectReference]
116 and not isinstance(source_field_value, BaseModel)
117 ):
118 destination.attributes[field_name] = str(source_field_value)
119 field_processed = True
120 elif field_name in destination.__class__.model_fields:
121 field_info = destination.__class__.model_fields[field_name]
122 annotation_args = get_args(field_info.annotation) or [] # type: ignore[var-annotated]
123 if (
124 ObjectReference not in annotation_args
125 and List[ObjectReference] not in annotation_args
126 and not field_info.frozen
127 and (not getattr(destination, field_name, None) or self.strategy == MergingStrategy.OVERWRITE)
128 and (source_field_value_type is field_info.annotation or source_field_value_type in annotation_args)
129 ):
130 setattr(destination, field_name, source_field_value)
131 field_processed = True
132 return field_processed
133
[docs]
134 def merge(self, source: BaseModel, destination: BaseModel, ignored_fields: Optional[List[str]] = None) -> List[str]:
135 """Merge all compatible fields from the source object into the destination object.
136
137 This method iterates through all fields in the source object and attempts
138 to merge them into the destination object. It handles both regular object
139 fields and extra attributes dictionary if supported.
140
141 Args:
142 source: The source model to merge from
143 destination: The destination model to merge to
144 ignored_fields: List of field names to skip during merging
145
146 Returns:
147 A list of field names that could not be processed during
148 the merge operation. Fields containing ObjectReference types
149 are automatically added to this list.
150 """
151 unprocessed_fields = []
152 source_attributes = getattr(source, "attributes", None)
153 destination_attributes = getattr(destination, "attributes", None)
154
155 if destination_attributes is None and hasattr(destination, "attributes"):
156 destination.attributes = {}
157
158 # Merge model fields
159 for field_name, field_info in source.__class__.model_fields.items():
160 source_field_value = getattr(source, field_name, None)
161 if ObjectReference in get_args(field_info.annotation):
162 unprocessed_fields.append(field_name)
163 elif not self.merge_field(destination, field_name, source_field_value, ignored_fields):
164 unprocessed_fields.append(field_name)
165
166 # Merge extra attributes
167 if source_attributes:
168 for name, value in source_attributes.items():
169 if not self.merge_field(destination, name, value):
170 unprocessed_fields.append(f"attributes.{name}")
171
172 return unprocessed_fields
173
174
[docs]
175class FeedMerger:
176 def __init__(self, source_feed: ColanderFeed, destination_feed: ColanderFeed):
177 self.source_feed = source_feed
178 self.destination_feed = destination_feed
179 self.id_rewrite: Dict[UUID4, UUID4] = {} # source, destination
180 self.merging_candidates: Dict[Entity, Entity] = {} # source, destination
181 self.added_entities: List[Entity] = [] # source added to the destination feed
182 self.merged_entities: Dict[Entity, Entity] = {} # source, destination
183
[docs]
184 def merge(self, delete_unlinked: bool = False, aggressive: bool = False):
185 """
186 Merge the source feed into the destination feed.
187 This method performs a comprehensive merge operation between two ColanderFeeds,
188 handling entity merging, relation updates, and immutable relation management.
189 The merge process follows these main steps:
190
191 1. Identify merging candidates based on entity similarity
192 2. Merge compatible entities or add new ones to the destination feed
193 3. Update immutable relation references after merging
194 4. Copy non-immutable relations from source to destination
195
196 Args:
197 delete_unlinked: If True, delete relations involving missing entities
198 aggressive: If True, temporarily breaks and rebuilds immutable relations
199 to allow more flexible merging. Use with caution as this may
200 affect data integrity during the merge process.
201 """
202 model_merger = BaseModelMerger(strategy=MergingStrategy.PRESERVE)
203
204 if aggressive:
205 self.source_feed.break_immutable_relations()
206 self.destination_feed.break_immutable_relations()
207
208 # Identify merging candidates or add missing source entities to the destination feed
209 for _, source_entity in self.source_feed.entities.items():
210 destination_candidates = self.destination_feed.get_entities_similar_to(source_entity)
211 # Multiple or no candidates found or multiple immutable relations, add to the destination feed
212 if len(destination_candidates) != 1 or len(source_entity.get_immutable_relations()) > 0:
213 self.destination_feed.entities[str(source_entity.id)] = source_entity
214 self.id_rewrite[source_entity.id] = source_entity.id
215 self.added_entities.append(source_entity)
216 else:
217 # Only one candidate found
218 _, destination_candidate = destination_candidates.popitem()
219 # Candidate has no immutable relations, merge
220 if len(source_entity.get_immutable_relations()) == 0:
221 model_merger.merge(source_entity, destination_candidate)
222 destination_candidate.touch()
223 self.id_rewrite[source_entity.id] = destination_candidate.id
224 self.merged_entities[source_entity] = destination_candidate
225 else:
226 self.merging_candidates[source_entity] = destination_candidate
227
228 for destination_entity in self.destination_feed.entities.values():
229 for _, immutable_relation in destination_entity.get_immutable_relations().items():
230 # The relation destination entity is missing: add it to the destination feed
231 if (
232 immutable_relation.obj_to not in self.merged_entities
233 and immutable_relation.obj_to not in self.added_entities
234 ):
235 self.destination_feed.entities[str(immutable_relation.obj_to.id)] = immutable_relation.obj_to
236 self.added_entities.append(immutable_relation.obj_to)
237 # The relation obj_to has been merged: update the reference
238 elif immutable_relation.obj_to in self.merged_entities:
239 original_obj_to = immutable_relation.obj_to
240 merged_obj_to = self.merged_entities[original_obj_to]
241 object_reference = getattr(destination_entity, immutable_relation.name)
242 if not object_reference:
243 continue
244 if isinstance(object_reference, list):
245 object_reference.remove(original_obj_to)
246 object_reference.append(merged_obj_to)
247 else:
248 setattr(destination_entity, immutable_relation.name, merged_obj_to)
249
250 for _, source_relation in self.source_feed.relations.items():
251 obj_from: Entity = self.merged_entities.get(source_relation.obj_from, source_relation.obj_from)
252 obj_to: Entity = self.merged_entities.get(source_relation.obj_to, source_relation.obj_to)
253 relation_exists = False
254 for _, relation in self.destination_feed.get_outgoing_relations(obj_from, exclude_immutables=True).items():
255 if relation.obj_to == obj_to and relation.name == source_relation.name:
256 relation_exists = True
257 if not relation_exists:
258 self.destination_feed.relations[str(source_relation.id)] = EntityRelation(
259 id=source_relation.id,
260 name=source_relation.name,
261 obj_from=obj_from,
262 obj_to=obj_to,
263 )
264
265 unlinked_relations: List[str] = []
266 for _, relation in self.destination_feed.relations.items():
267 obj_from = relation.obj_from
268 obj_to = relation.obj_to
269 if not self.destination_feed.contains(obj_from) or not self.destination_feed.contains(obj_to):
270 unlinked_relations.append(str(relation.id))
271
272 if delete_unlinked:
273 for relation_id in unlinked_relations:
274 self.destination_feed.relations.pop(relation_id)
275 elif unlinked_relations:
276 raise Exception(f"{len(unlinked_relations)} unlinked relation detected")
277
278 if aggressive:
279 self.source_feed.rebuild_immutable_relations()
280 self.destination_feed.rebuild_immutable_relations()