Source code for colander_data_converter.base.utils

  1import enum
  2from typing import get_args, List, Any, Optional
  3
  4from pydantic import BaseModel
  5
  6from colander_data_converter.base.common import ObjectReference
  7
  8
[docs] 9class MergingStrategy(str, enum.Enum): 10 PRESERVE = "preserve" 11 OVERWRITE = "overwrite"
12 13
[docs] 14class BaseModelMerger: 15 """ 16 A utility class for merging :py:class:`pydantic.BaseModel` instances with configurable strategies. 17 18 This class provides functionality to merge fields from a source BaseModel into a 19 destination BaseModel, handling both regular model fields and extra attributes. 20 Fields containing `ObjectReference` types are automatically 21 excluded from merging and reported as unprocessed. 22 23 The merger supports two strategies: 24 25 - ``PRESERVE``: Only merge fields if the destination field is empty or `None` 26 - ``OVERWRITE``: Always merge fields from source to destination 27 28 Fields are merged based on type compatibility and field constraints. Extra 29 attributes are automatically converted to strings when stored in the attributes 30 dictionary (if supported by the destination model). 31 32 Example: 33 >>> from pydantic import BaseModel 34 >>> class SourceModel(BaseModel): 35 ... name: str 36 ... age: int 37 ... attributes: dict = {} 38 >>> class DestinationModel(BaseModel): 39 ... name: str 40 ... age: int 41 ... city: str = "Unknown" 42 ... attributes: dict = {} 43 >>> source = SourceModel(name="Alice", age=30) 44 >>> destination = DestinationModel(name="Bob", age=25) 45 >>> merger = BaseModelMerger(strategy=MergingStrategy.OVERWRITE) 46 >>> unprocessed = merger.merge(source, destination) 47 >>> print(destination.name) 48 Alice 49 >>> print(destination.age) 50 30 51 >>> print(destination.city) 52 Unknown 53 54 Note: 55 - Fields with ``ObjectReference`` types are never merged and are reported as unprocessed 56 - Frozen fields cannot be modified and will be reported as unprocessed 57 - Complex types (list, dict, tuple, set) in extra attributes are not supported 58 - Extra attributes are converted to strings when stored 59 """ 60
[docs] 61 def __init__(self, strategy: MergingStrategy = MergingStrategy.OVERWRITE): 62 """Initialize the ``BaseModelMerger`` with a merging strategy. 63 64 Args: 65 strategy: The strategy to use when merging fields. 66 """ 67 self.strategy = strategy
68
[docs] 69 def merge_field( 70 self, destination: BaseModel, field_name: str, field_value: Any, ignored_fields: Optional[List[str]] = None 71 ) -> bool: 72 """Merge a single field from source to destination model. 73 74 This method handles the logic for merging individual fields, including 75 type checking, field existence validation, and attribute handling. It 76 processes both regular model fields and extra attributes based on the 77 destination model's capabilities and field constraints. 78 79 Note: 80 The method follows these rules: 81 82 - Skips fields listed in ignored_fields 83 - Skips empty/None field values 84 - For fields not in the destination model schema: stores as string in 85 attributes dict (if supported) unless the value is a complex type 86 - For schema fields: merges only if type-compatible, not frozen, not 87 containing ObjectReference, and destination is empty (``PRESERVE``) or 88 strategy is ``OVERWRITE`` 89 90 Args: 91 destination: The target model to merge into. 92 field_name: The name of the field to merge. 93 field_value: The value to merge from the source. 94 ignored_fields: List of field names to skip during merging. 95 96 Returns: 97 True if the field was processed (successfully merged or handled), 98 False if the field could not be processed 99 """ 100 field_processed = False 101 if not field_value: 102 return field_processed 103 if not ignored_fields: 104 ignored_fields = [] 105 extra_attributes_supported = hasattr(destination, "attributes") 106 source_field_value = field_value 107 source_field_value_type = type(field_value) 108 if field_name in ignored_fields: 109 return field_processed 110 # Append in extra attribute dict if supported 111 if ( 112 field_name not in destination.__class__.model_fields 113 and extra_attributes_supported 114 and source_field_value_type not in [list, dict, tuple, set, ObjectReference] 115 and not isinstance(source_field_value, BaseModel) 116 ): 117 destination.attributes[field_name] = str(source_field_value) 118 field_processed = True 119 elif field_name in destination.__class__.model_fields: 120 field_info = destination.__class__.model_fields[field_name] 121 annotation_args = get_args(field_info.annotation) or [] # type: ignore[var-annotated] 122 if ( 123 ObjectReference not in annotation_args 124 and List[ObjectReference] not in annotation_args 125 and not field_info.frozen 126 and (not getattr(destination, field_name, None) or self.strategy == MergingStrategy.OVERWRITE) 127 and (source_field_value_type is field_info.annotation or source_field_value_type in annotation_args) 128 ): 129 setattr(destination, field_name, source_field_value) 130 field_processed = True 131 return field_processed
132
[docs] 133 def merge(self, source: BaseModel, destination: BaseModel, ignored_fields: Optional[List[str]] = None) -> List[str]: 134 """Merge all compatible fields from the source object into the destination object. 135 136 This method iterates through all fields in the source object and attempts 137 to merge them into the destination object. It handles both regular object 138 fields and extra attributes dictionary if supported. 139 140 Args: 141 source: The source model to merge from 142 destination: The destination model to merge to 143 ignored_fields: List of field names to skip during merging 144 145 Returns: 146 A list of field names that could not be processed during 147 the merge operation. Fields containing ObjectReference types 148 are automatically added to this list. 149 """ 150 unprocessed_fields = [] 151 source_attributes = getattr(source, "attributes", None) 152 destination_attributes = getattr(destination, "attributes", None) 153 154 if destination_attributes is None and hasattr(destination, "attributes"): 155 destination.attributes = {} 156 157 # Merge model fields 158 for field_name, field_info in source.__class__.model_fields.items(): 159 source_field_value = getattr(source, field_name, None) 160 if ObjectReference in get_args(field_info.annotation): 161 unprocessed_fields.append(field_name) 162 elif not self.merge_field(destination, field_name, source_field_value, ignored_fields): 163 unprocessed_fields.append(field_name) 164 165 # Merge extra attributes 166 if source_attributes: 167 for name, value in source_attributes.items(): 168 if not self.merge_field(destination, name, value): 169 unprocessed_fields.append(f"attributes.{name}") 170 171 return unprocessed_fields