1from enum import Enum 2from typing import ( 3 Generic, 4 List, 5 Optional, 6 Set, 7 TYPE_CHECKING, 8 Type, 9 TypeVar, 10 Union, 11 cast, 12) 13 14import ormar # noqa I100 15from ormar.exceptions import RelationshipInstanceError # noqa I100 16from ormar.relations.relation_proxy import RelationProxy 17 18if TYPE_CHECKING: # pragma no cover 19 from ormar.relations import RelationsManager 20 from ormar.models import Model, NewBaseModel, T 21else: 22 T = TypeVar("T", bound="Model") 23 24 25class RelationType(Enum): 26 """ 27 Different types of relations supported by ormar: 28 29 * ForeignKey = PRIMARY 30 * reverse ForeignKey = REVERSE 31 * ManyToMany = MULTIPLE 32 """ 33 34 PRIMARY = 1 35 REVERSE = 2 36 MULTIPLE = 3 37 THROUGH = 4 38 39 40class Relation(Generic[T]): 41 """ 42 Keeps related Models and handles adding/removing of the children. 43 """ 44 45 def __init__( 46 self, 47 manager: "RelationsManager", 48 type_: RelationType, 49 field_name: str, 50 to: Type["T"], 51 through: Type["Model"] = None, 52 ) -> None: 53 """ 54 Initialize the Relation and keep the related models either as instances of 55 passed Model, or as a RelationProxy which is basically a list of models with 56 some special behavior, as it exposes QuerySetProxy and allows querying the 57 related models already pre filtered by parent model. 58 59 :param manager: reference to relation manager 60 :type manager: RelationsManager 61 :param type_: type of the relation 62 :type type_: RelationType 63 :param field_name: name of the relation field 64 :type field_name: str 65 :param to: model to which relation leads to 66 :type to: Type[Model] 67 :param through: model through which relation goes for m2m relations 68 :type through: Type[Model] 69 """ 70 self.manager = manager 71 self._owner: "Model" = manager.owner 72 self._type: RelationType = type_ 73 self._to_remove: Set = set() 74 self.to: Type["T"] = to 75 self._through = through 76 self.field_name: str = field_name 77 self.related_models: Optional[Union[RelationProxy, "Model"]] = ( 78 RelationProxy(relation=self, type_=type_, to=to, field_name=field_name) 79 if type_ in (RelationType.REVERSE, RelationType.MULTIPLE) 80 else None 81 ) 82 83 def clear(self) -> None: 84 if self._type in (RelationType.PRIMARY, RelationType.THROUGH): 85 self.related_models = None 86 self._owner.__dict__[self.field_name] = None 87 elif self.related_models is not None: 88 related_models = cast("RelationProxy", self.related_models) 89 related_models._clear() 90 self._owner.__dict__[self.field_name] = None 91 92 @property 93 def through(self) -> Type["Model"]: 94 if not self._through: # pragma: no cover 95 raise RelationshipInstanceError("Relation does not have through model!") 96 return self._through 97 98 def _clean_related(self) -> None: 99 """ 100 Removes dead weakrefs from RelationProxy. 101 """ 102 cleaned_data = [ 103 x 104 for i, x in enumerate(self.related_models) # type: ignore 105 if i not in self._to_remove 106 ] 107 self.related_models = RelationProxy( 108 relation=self, 109 type_=self._type, 110 to=self.to, 111 field_name=self.field_name, 112 data_=cleaned_data, 113 ) 114 relation_name = self.field_name 115 self._owner.__dict__[relation_name] = cleaned_data 116 self._to_remove = set() 117 118 def _find_existing( 119 self, child: Union["NewBaseModel", Type["NewBaseModel"]] 120 ) -> Optional[int]: 121 """ 122 Find child model in RelationProxy if exists. 123 124 :param child: child model to find 125 :type child: Model 126 :return: index of child in RelationProxy 127 :rtype: Optional[ind] 128 """ 129 if not isinstance(self.related_models, RelationProxy): # pragma nocover 130 raise ValueError("Cannot find existing models in parent relation type") 131 if self._to_remove: 132 self._clean_related() 133 for ind, relation_child in enumerate(self.related_models[:]): 134 try: 135 if relation_child == child: 136 return ind 137 except ReferenceError: # pragma no cover 138 self._to_remove.add(ind) 139 return None 140 141 def add(self, child: "Model") -> None: 142 """ 143 Adds child Model to relation, either sets child as related model or adds 144 it to the list in RelationProxy depending on relation type. 145 146 :param child: model to add to relation 147 :type child: Model 148 """ 149 relation_name = self.field_name 150 if self._type in (RelationType.PRIMARY, RelationType.THROUGH): 151 self.related_models = child 152 self._owner.__dict__[relation_name] = child 153 else: 154 if self._find_existing(child) is None: 155 self.related_models.append(child) # type: ignore 156 rel = self._owner.__dict__.get(relation_name, []) 157 rel = rel or [] 158 if not isinstance(rel, list): 159 rel = [rel] 160 rel.append(child) 161 self._owner.__dict__[relation_name] = rel 162 163 def remove(self, child: Union["NewBaseModel", Type["NewBaseModel"]]) -> None: 164 """ 165 Removes child Model from relation, either sets None as related model or removes 166 it from the list in RelationProxy depending on relation type. 167 168 :param child: model to remove from relation 169 :type child: Model 170 """ 171 relation_name = self.field_name 172 if self._type == RelationType.PRIMARY: 173 if self.related_models == child: 174 self.related_models = None 175 del self._owner.__dict__[relation_name] 176 else: 177 position = self._find_existing(child) 178 if position is not None: 179 self.related_models.pop(position) # type: ignore 180 del self._owner.__dict__[relation_name][position] 181 182 def get(self) -> Optional[Union[List["Model"], "Model"]]: 183 """ 184 Return the related model or models from RelationProxy. 185 186 :return: related model/models if set 187 :rtype: Optional[Union[List[Model], Model]] 188 """ 189 return self.related_models 190 191 def __repr__(self) -> str: # pragma no cover 192 if self._to_remove: 193 self._clean_related() 194 return str(self.related_models) 195