1from enum import Enum
2from typing import (
3    Generic,
4    List,
5    Optional,
6    Set,
7    TYPE_CHECKING,
8    Type,
9    TypeVar,
10    Union,
11    cast,
12)
13
14import ormar  # noqa I100
15from ormar.exceptions import RelationshipInstanceError  # noqa I100
16from ormar.relations.relation_proxy import RelationProxy
17
18if TYPE_CHECKING:  # pragma no cover
19    from ormar.relations import RelationsManager
20    from ormar.models import Model, NewBaseModel, T
21else:
22    T = TypeVar("T", bound="Model")
23
24
25class RelationType(Enum):
26    """
27    Different types of relations supported by ormar:
28
29    *  ForeignKey = PRIMARY
30    *  reverse ForeignKey = REVERSE
31    *  ManyToMany = MULTIPLE
32    """
33
34    PRIMARY = 1
35    REVERSE = 2
36    MULTIPLE = 3
37    THROUGH = 4
38
39
40class Relation(Generic[T]):
41    """
42    Keeps related Models and handles adding/removing of the children.
43    """
44
45    def __init__(
46        self,
47        manager: "RelationsManager",
48        type_: RelationType,
49        field_name: str,
50        to: Type["T"],
51        through: Type["Model"] = None,
52    ) -> None:
53        """
54        Initialize the Relation and keep the related models either as instances of
55        passed Model, or as a RelationProxy which is basically a list of models with
56        some special behavior, as it exposes QuerySetProxy and allows querying the
57        related models already pre filtered by parent model.
58
59        :param manager: reference to relation manager
60        :type manager: RelationsManager
61        :param type_: type of the relation
62        :type type_: RelationType
63        :param field_name: name of the relation field
64        :type field_name: str
65        :param to: model to which relation leads to
66        :type to: Type[Model]
67        :param through: model through which relation goes for m2m relations
68        :type through: Type[Model]
69        """
70        self.manager = manager
71        self._owner: "Model" = manager.owner
72        self._type: RelationType = type_
73        self._to_remove: Set = set()
74        self.to: Type["T"] = to
75        self._through = through
76        self.field_name: str = field_name
77        self.related_models: Optional[Union[RelationProxy, "Model"]] = (
78            RelationProxy(relation=self, type_=type_, to=to, field_name=field_name)
79            if type_ in (RelationType.REVERSE, RelationType.MULTIPLE)
80            else None
81        )
82
83    def clear(self) -> None:
84        if self._type in (RelationType.PRIMARY, RelationType.THROUGH):
85            self.related_models = None
86            self._owner.__dict__[self.field_name] = None
87        elif self.related_models is not None:
88            related_models = cast("RelationProxy", self.related_models)
89            related_models._clear()
90            self._owner.__dict__[self.field_name] = None
91
92    @property
93    def through(self) -> Type["Model"]:
94        if not self._through:  # pragma: no cover
95            raise RelationshipInstanceError("Relation does not have through model!")
96        return self._through
97
98    def _clean_related(self) -> None:
99        """
100        Removes dead weakrefs from RelationProxy.
101        """
102        cleaned_data = [
103            x
104            for i, x in enumerate(self.related_models)  # type: ignore
105            if i not in self._to_remove
106        ]
107        self.related_models = RelationProxy(
108            relation=self,
109            type_=self._type,
110            to=self.to,
111            field_name=self.field_name,
112            data_=cleaned_data,
113        )
114        relation_name = self.field_name
115        self._owner.__dict__[relation_name] = cleaned_data
116        self._to_remove = set()
117
118    def _find_existing(
119        self, child: Union["NewBaseModel", Type["NewBaseModel"]]
120    ) -> Optional[int]:
121        """
122        Find child model in RelationProxy if exists.
123
124        :param child: child model to find
125        :type child: Model
126        :return: index of child in RelationProxy
127        :rtype: Optional[ind]
128        """
129        if not isinstance(self.related_models, RelationProxy):  # pragma nocover
130            raise ValueError("Cannot find existing models in parent relation type")
131        if self._to_remove:
132            self._clean_related()
133        for ind, relation_child in enumerate(self.related_models[:]):
134            try:
135                if relation_child == child:
136                    return ind
137            except ReferenceError:  # pragma no cover
138                self._to_remove.add(ind)
139        return None
140
141    def add(self, child: "Model") -> None:
142        """
143        Adds child Model to relation, either sets child as related model or adds
144        it to the list in RelationProxy depending on relation type.
145
146        :param child: model to add to relation
147        :type child: Model
148        """
149        relation_name = self.field_name
150        if self._type in (RelationType.PRIMARY, RelationType.THROUGH):
151            self.related_models = child
152            self._owner.__dict__[relation_name] = child
153        else:
154            if self._find_existing(child) is None:
155                self.related_models.append(child)  # type: ignore
156                rel = self._owner.__dict__.get(relation_name, [])
157                rel = rel or []
158                if not isinstance(rel, list):
159                    rel = [rel]
160                rel.append(child)
161                self._owner.__dict__[relation_name] = rel
162
163    def remove(self, child: Union["NewBaseModel", Type["NewBaseModel"]]) -> None:
164        """
165        Removes child Model from relation, either sets None as related model or removes
166        it from the list in RelationProxy depending on relation type.
167
168        :param child: model to remove from relation
169        :type child: Model
170        """
171        relation_name = self.field_name
172        if self._type == RelationType.PRIMARY:
173            if self.related_models == child:
174                self.related_models = None
175                del self._owner.__dict__[relation_name]
176        else:
177            position = self._find_existing(child)
178            if position is not None:
179                self.related_models.pop(position)  # type: ignore
180                del self._owner.__dict__[relation_name][position]
181
182    def get(self) -> Optional[Union[List["Model"], "Model"]]:
183        """
184        Return the related model or models from RelationProxy.
185
186        :return: related model/models if set
187        :rtype: Optional[Union[List[Model], Model]]
188        """
189        return self.related_models
190
191    def __repr__(self) -> str:  # pragma no cover
192        if self._to_remove:
193            self._clean_related()
194        return str(self.related_models)
195