1import enum
2from typing import get_args, List, Any, Optional
3
4from pydantic import BaseModel
5
6from colander_data_converter.base.common import ObjectReference
7
8
[docs]
9class MergingStrategy(str, enum.Enum):
10 PRESERVE = "preserve"
11 OVERWRITE = "overwrite"
12
13
[docs]
14class BaseModelMerger:
15 """
16 A utility class for merging :py:class:`pydantic.BaseModel` instances with configurable strategies.
17
18 This class provides functionality to merge fields from a source BaseModel into a
19 destination BaseModel, handling both regular model fields and extra attributes.
20 Fields containing `ObjectReference` types are automatically
21 excluded from merging and reported as unprocessed.
22
23 The merger supports two strategies:
24
25 - ``PRESERVE``: Only merge fields if the destination field is empty or `None`
26 - ``OVERWRITE``: Always merge fields from source to destination
27
28 Fields are merged based on type compatibility and field constraints. Extra
29 attributes are automatically converted to strings when stored in the attributes
30 dictionary (if supported by the destination model).
31
32 Example:
33 >>> from pydantic import BaseModel
34 >>> class SourceModel(BaseModel):
35 ... name: str
36 ... age: int
37 ... attributes: dict = {}
38 >>> class DestinationModel(BaseModel):
39 ... name: str
40 ... age: int
41 ... city: str = "Unknown"
42 ... attributes: dict = {}
43 >>> source = SourceModel(name="Alice", age=30)
44 >>> destination = DestinationModel(name="Bob", age=25)
45 >>> merger = BaseModelMerger(strategy=MergingStrategy.OVERWRITE)
46 >>> unprocessed = merger.merge(source, destination)
47 >>> print(destination.name)
48 Alice
49 >>> print(destination.age)
50 30
51 >>> print(destination.city)
52 Unknown
53
54 Note:
55 - Fields with ``ObjectReference`` types are never merged and are reported as unprocessed
56 - Frozen fields cannot be modified and will be reported as unprocessed
57 - Complex types (list, dict, tuple, set) in extra attributes are not supported
58 - Extra attributes are converted to strings when stored
59 """
60
[docs]
61 def __init__(self, strategy: MergingStrategy = MergingStrategy.OVERWRITE):
62 """Initialize the ``BaseModelMerger`` with a merging strategy.
63
64 Args:
65 strategy: The strategy to use when merging fields.
66 """
67 self.strategy = strategy
68
[docs]
69 def merge_field(
70 self, destination: BaseModel, field_name: str, field_value: Any, ignored_fields: Optional[List[str]] = None
71 ) -> bool:
72 """Merge a single field from source to destination model.
73
74 This method handles the logic for merging individual fields, including
75 type checking, field existence validation, and attribute handling. It
76 processes both regular model fields and extra attributes based on the
77 destination model's capabilities and field constraints.
78
79 Note:
80 The method follows these rules:
81
82 - Skips fields listed in ignored_fields
83 - Skips empty/None field values
84 - For fields not in the destination model schema: stores as string in
85 attributes dict (if supported) unless the value is a complex type
86 - For schema fields: merges only if type-compatible, not frozen, not
87 containing ObjectReference, and destination is empty (``PRESERVE``) or
88 strategy is ``OVERWRITE``
89
90 Args:
91 destination: The target model to merge into.
92 field_name: The name of the field to merge.
93 field_value: The value to merge from the source.
94 ignored_fields: List of field names to skip during merging.
95
96 Returns:
97 True if the field was processed (successfully merged or handled),
98 False if the field could not be processed
99 """
100 field_processed = False
101 if not field_value:
102 return field_processed
103 if not ignored_fields:
104 ignored_fields = []
105 extra_attributes_supported = hasattr(destination, "attributes")
106 source_field_value = field_value
107 source_field_value_type = type(field_value)
108 if field_name in ignored_fields:
109 return field_processed
110 # Append in extra attribute dict if supported
111 if (
112 field_name not in destination.__class__.model_fields
113 and extra_attributes_supported
114 and source_field_value_type not in [list, dict, tuple, set, ObjectReference]
115 and not isinstance(source_field_value, BaseModel)
116 ):
117 destination.attributes[field_name] = str(source_field_value)
118 field_processed = True
119 elif field_name in destination.__class__.model_fields:
120 field_info = destination.__class__.model_fields[field_name]
121 annotation_args = get_args(field_info.annotation) or [] # type: ignore[var-annotated]
122 if (
123 ObjectReference not in annotation_args
124 and List[ObjectReference] not in annotation_args
125 and not field_info.frozen
126 and (not getattr(destination, field_name, None) or self.strategy == MergingStrategy.OVERWRITE)
127 and (source_field_value_type is field_info.annotation or source_field_value_type in annotation_args)
128 ):
129 setattr(destination, field_name, source_field_value)
130 field_processed = True
131 return field_processed
132
[docs]
133 def merge(self, source: BaseModel, destination: BaseModel, ignored_fields: Optional[List[str]] = None) -> List[str]:
134 """Merge all compatible fields from the source object into the destination object.
135
136 This method iterates through all fields in the source object and attempts
137 to merge them into the destination object. It handles both regular object
138 fields and extra attributes dictionary if supported.
139
140 Args:
141 source: The source model to merge from
142 destination: The destination model to merge to
143 ignored_fields: List of field names to skip during merging
144
145 Returns:
146 A list of field names that could not be processed during
147 the merge operation. Fields containing ObjectReference types
148 are automatically added to this list.
149 """
150 unprocessed_fields = []
151 source_attributes = getattr(source, "attributes", None)
152 destination_attributes = getattr(destination, "attributes", None)
153
154 if destination_attributes is None and hasattr(destination, "attributes"):
155 destination.attributes = {}
156
157 # Merge model fields
158 for field_name, field_info in source.__class__.model_fields.items():
159 source_field_value = getattr(source, field_name, None)
160 if ObjectReference in get_args(field_info.annotation):
161 unprocessed_fields.append(field_name)
162 elif not self.merge_field(destination, field_name, source_field_value, ignored_fields):
163 unprocessed_fields.append(field_name)
164
165 # Merge extra attributes
166 if source_attributes:
167 for name, value in source_attributes.items():
168 if not self.merge_field(destination, name, value):
169 unprocessed_fields.append(f"attributes.{name}")
170
171 return unprocessed_fields