1from sqlalchemy import ForeignKey
2from sqlalchemy import Integer
3from sqlalchemy import testing
4from sqlalchemy.orm import attributes
5from sqlalchemy.orm import class_mapper
6from sqlalchemy.orm import exc as orm_exc
7from sqlalchemy.orm import sync
8from sqlalchemy.orm import unitofwork
9from sqlalchemy.testing import assert_raises_message
10from sqlalchemy.testing import eq_
11from sqlalchemy.testing import fixtures
12from sqlalchemy.testing.fixtures import fixture_session
13from sqlalchemy.testing.schema import Column
14from sqlalchemy.testing.schema import Table
15
16
17class AssertsUOW(object):
18    def _get_test_uow(self, session):
19        uow = unitofwork.UOWTransaction(session)
20        deleted = set(session._deleted)
21        new = set(session._new)
22        dirty = set(session._dirty_states).difference(deleted)
23        for s in new.union(dirty):
24            uow.register_object(s)
25        for d in deleted:
26            uow.register_object(d, isdelete=True)
27        return uow
28
29
30class SyncTest(
31    fixtures.MappedTest, testing.AssertsExecutionResults, AssertsUOW
32):
33    @classmethod
34    def define_tables(cls, metadata):
35        Table(
36            "t1",
37            metadata,
38            Column("id", Integer, primary_key=True),
39            Column("foo", Integer),
40        )
41        Table(
42            "t2",
43            metadata,
44            Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
45            Column("t1id", Integer, ForeignKey("t1.id")),
46        )
47
48    @classmethod
49    def setup_classes(cls):
50        class A(cls.Basic):
51            pass
52
53        class B(cls.Basic):
54            pass
55
56    @classmethod
57    def setup_mappers(cls):
58        cls.mapper_registry.map_imperatively(cls.classes.A, cls.tables.t1)
59        cls.mapper_registry.map_imperatively(cls.classes.B, cls.tables.t2)
60
61    def _fixture(self):
62        A, B = self.classes.A, self.classes.B
63        session = fixture_session()
64        uowcommit = self._get_test_uow(session)
65        a_mapper = class_mapper(A)
66        b_mapper = class_mapper(B)
67        self.a1 = a1 = A()
68        self.b1 = b1 = B()
69        uowcommit = self._get_test_uow(session)
70        return (
71            uowcommit,
72            attributes.instance_state(a1),
73            attributes.instance_state(b1),
74            a_mapper,
75            b_mapper,
76        )
77
78    def test_populate(self):
79        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
80        pairs = [(a_mapper.c.id, b_mapper.c.id)]
81        a1.obj().id = 7
82        assert "id" not in b1.obj().__dict__
83        sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, False)
84        eq_(b1.obj().id, 7)
85        eq_(b1.obj().__dict__["id"], 7)
86        assert ("pk_cascaded", b1, b_mapper.c.id) not in uowcommit.attributes
87
88    def test_populate_flag_cascaded(self):
89        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
90        pairs = [(a_mapper.c.id, b_mapper.c.id)]
91        a1.obj().id = 7
92        assert "id" not in b1.obj().__dict__
93        sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, True)
94        eq_(b1.obj().id, 7)
95        eq_(b1.obj().__dict__["id"], 7)
96        eq_(uowcommit.attributes[("pk_cascaded", b1, b_mapper.c.id)], True)
97
98    def test_populate_unmapped_source(self):
99        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
100        pairs = [(b_mapper.c.id, b_mapper.c.id)]
101        assert_raises_message(
102            orm_exc.UnmappedColumnError,
103            "Can't execute sync rule for source column 't2.id'; "
104            r"mapper 'mapped class A->t1' does not map this column.",
105            sync.populate,
106            a1,
107            a_mapper,
108            b1,
109            b_mapper,
110            pairs,
111            uowcommit,
112            False,
113        )
114
115    def test_populate_unmapped_dest(self):
116        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
117        pairs = [(a_mapper.c.id, a_mapper.c.id)]
118        assert_raises_message(
119            orm_exc.UnmappedColumnError,
120            r"Can't execute sync rule for destination "
121            r"column 't1.id'; "
122            r"mapper 'mapped class B->t2' does not map this column.",
123            sync.populate,
124            a1,
125            a_mapper,
126            b1,
127            b_mapper,
128            pairs,
129            uowcommit,
130            False,
131        )
132
133    def test_clear(self):
134        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
135        pairs = [(a_mapper.c.id, b_mapper.c.t1id)]
136        b1.obj().t1id = 8
137        eq_(b1.obj().__dict__["t1id"], 8)
138        sync.clear(b1, b_mapper, pairs)
139        eq_(b1.obj().__dict__["t1id"], None)
140
141    def test_clear_pk(self):
142        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
143        pairs = [(a_mapper.c.id, b_mapper.c.id)]
144        b1.obj().id = 8
145        eq_(b1.obj().__dict__["id"], 8)
146        assert_raises_message(
147            AssertionError,
148            "Dependency rule tried to blank-out primary key "
149            "column 't2.id' on instance '<B",
150            sync.clear,
151            b1,
152            b_mapper,
153            pairs,
154        )
155
156    def test_clear_unmapped(self):
157        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
158        pairs = [(a_mapper.c.id, a_mapper.c.foo)]
159        assert_raises_message(
160            orm_exc.UnmappedColumnError,
161            "Can't execute sync rule for destination "
162            r"column 't1.foo'; mapper 'mapped class B->t2' does not "
163            "map this column.",
164            sync.clear,
165            b1,
166            b_mapper,
167            pairs,
168        )
169
170    def test_update(self):
171        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
172        a1.obj().id = 10
173        a1._commit_all(a1.dict)
174        a1.obj().id = 12
175        pairs = [(a_mapper.c.id, b_mapper.c.id)]
176        dest = {}
177        sync.update(a1, a_mapper, dest, "old_", pairs)
178        eq_(dest, {"id": 12, "old_id": 10})
179
180    def test_update_unmapped(self):
181        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
182        pairs = [(b_mapper.c.id, b_mapper.c.id)]
183        dest = {}
184        assert_raises_message(
185            orm_exc.UnmappedColumnError,
186            "Can't execute sync rule for source column 't2.id'; "
187            r"mapper 'mapped class A->t1' does not map this column.",
188            sync.update,
189            a1,
190            a_mapper,
191            dest,
192            "old_",
193            pairs,
194        )
195
196    def test_populate_dict(self):
197        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
198        a1.obj().id = 10
199        pairs = [(a_mapper.c.id, b_mapper.c.id)]
200        dest = {}
201        sync.populate_dict(a1, a_mapper, dest, pairs)
202        eq_(dest, {"id": 10})
203
204    def test_populate_dict_unmapped(self):
205        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
206        a1.obj().id = 10
207        pairs = [(b_mapper.c.id, b_mapper.c.id)]
208        dest = {}
209        assert_raises_message(
210            orm_exc.UnmappedColumnError,
211            "Can't execute sync rule for source column 't2.id'; "
212            r"mapper 'mapped class A->t1' does not map this column.",
213            sync.populate_dict,
214            a1,
215            a_mapper,
216            dest,
217            pairs,
218        )
219
220    def test_source_modified_unmodified(self):
221        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
222        a1.obj().id = 10
223        pairs = [(a_mapper.c.id, b_mapper.c.id)]
224        eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), False)
225
226    def test_source_modified_no_pairs(self):
227        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
228        eq_(sync.source_modified(uowcommit, a1, a_mapper, []), False)
229
230    def test_source_modified_modified(self):
231        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
232        a1.obj().id = 10
233        a1._commit_all(a1.dict)
234        a1.obj().id = 12
235        pairs = [(a_mapper.c.id, b_mapper.c.id)]
236        eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), True)
237
238    def test_source_modified_composite(self):
239        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
240        a1.obj().foo = 10
241        a1._commit_all(a1.dict)
242        a1.obj().foo = 12
243        pairs = [
244            (a_mapper.c.id, b_mapper.c.id),
245            (a_mapper.c.foo, b_mapper.c.id),
246        ]
247        eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), True)
248
249    def test_source_modified_composite_unmodified(self):
250        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
251        a1.obj().foo = 10
252        a1._commit_all(a1.dict)
253        pairs = [
254            (a_mapper.c.id, b_mapper.c.id),
255            (a_mapper.c.foo, b_mapper.c.id),
256        ]
257        eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), False)
258
259    def test_source_modified_no_unmapped(self):
260        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
261        pairs = [(b_mapper.c.id, b_mapper.c.id)]
262        assert_raises_message(
263            orm_exc.UnmappedColumnError,
264            "Can't execute sync rule for source column 't2.id'; "
265            r"mapper 'mapped class A->t1' does not map this column.",
266            sync.source_modified,
267            uowcommit,
268            a1,
269            a_mapper,
270            pairs,
271        )
272