1from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
2from sqlalchemy import testing
3from sqlalchemy.testing.schema import Table, Column
4from test.orm import _fixtures
5from sqlalchemy.testing import fixtures
6from sqlalchemy import Integer, String, ForeignKey, func
7from sqlalchemy.orm import mapper, relationship, backref, \
8                            create_session, unitofwork, attributes,\
9                            Session, class_mapper, sync, exc as orm_exc
10
11
12class AssertsUOW(object):
13    def _get_test_uow(self, session):
14        uow = unitofwork.UOWTransaction(session)
15        deleted = set(session._deleted)
16        new = set(session._new)
17        dirty = set(session._dirty_states).difference(deleted)
18        for s in new.union(dirty):
19            uow.register_object(s)
20        for d in deleted:
21            uow.register_object(d, isdelete=True)
22        return uow
23
24class SyncTest(fixtures.MappedTest,
25                    testing.AssertsExecutionResults, AssertsUOW):
26
27    @classmethod
28    def define_tables(cls, metadata):
29        Table('t1', metadata,
30            Column('id', Integer, primary_key=True),
31            Column('foo', Integer)
32        )
33        Table('t2', metadata,
34            Column('id', Integer, ForeignKey('t1.id'), primary_key=True),
35            Column('t1id', Integer, ForeignKey('t1.id')),
36        )
37
38    @classmethod
39    def setup_classes(cls):
40        class A(cls.Basic):
41            pass
42        class B(cls.Basic):
43            pass
44
45    @classmethod
46    def setup_mappers(cls):
47        mapper(cls.classes.A, cls.tables.t1)
48        mapper(cls.classes.B, cls.tables.t2)
49
50    def _fixture(self):
51        A, B = self.classes.A, self.classes.B
52        session = create_session()
53        uowcommit = self._get_test_uow(session)
54        a_mapper = class_mapper(A)
55        b_mapper= class_mapper(B)
56        self.a1 = a1 = A()
57        self.b1 = b1 = B()
58        uowcommit = self._get_test_uow(session)
59        return uowcommit,\
60            attributes.instance_state(a1),\
61            attributes.instance_state(b1),\
62            a_mapper, b_mapper
63
64    def test_populate(self):
65        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
66        pairs = [(a_mapper.c.id, b_mapper.c.id)]
67        a1.obj().id = 7
68        assert 'id' not in b1.obj().__dict__
69        sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, False)
70        eq_(b1.obj().id, 7)
71        eq_(b1.obj().__dict__['id'], 7)
72        assert ("pk_cascaded", b1, b_mapper.c.id) not in uowcommit.attributes
73
74    def test_populate_flag_cascaded(self):
75        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
76        pairs = [(a_mapper.c.id, b_mapper.c.id)]
77        a1.obj().id = 7
78        assert 'id' not in b1.obj().__dict__
79        sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, True)
80        eq_(b1.obj().id, 7)
81        eq_(b1.obj().__dict__['id'], 7)
82        eq_(uowcommit.attributes[("pk_cascaded", b1, b_mapper.c.id)], True)
83
84    def test_populate_unmapped_source(self):
85        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
86        pairs = [(b_mapper.c.id, b_mapper.c.id)]
87        assert_raises_message(
88            orm_exc.UnmappedColumnError,
89            "Can't execute sync rule for source column 't2.id'; "
90            r"mapper 'Mapper\|A\|t1' does not map this column.",
91            sync.populate,
92                a1,
93                a_mapper,
94                b1,
95                b_mapper,
96                pairs,
97                uowcommit, False
98        )
99
100    def test_populate_unmapped_dest(self):
101        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
102        pairs = [(a_mapper.c.id, a_mapper.c.id,)]
103        assert_raises_message(
104            orm_exc.UnmappedColumnError,
105            "Can't execute sync rule for destination "
106            r"column 't1.id'; mapper 'Mapper\|B\|t2' does not map this column.",
107            sync.populate,
108                a1,
109                a_mapper,
110                b1,
111                b_mapper,
112                pairs,
113                uowcommit, False
114        )
115
116    def test_clear(self):
117        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
118        pairs = [(a_mapper.c.id, b_mapper.c.t1id,)]
119        b1.obj().t1id = 8
120        eq_(b1.obj().__dict__['t1id'], 8)
121        sync.clear(b1, b_mapper, pairs)
122        eq_(b1.obj().__dict__['t1id'], None)
123
124    def test_clear_pk(self):
125        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
126        pairs = [(a_mapper.c.id, b_mapper.c.id,)]
127        b1.obj().id = 8
128        eq_(b1.obj().__dict__['id'], 8)
129        assert_raises_message(
130            AssertionError,
131            "Dependency rule tried to blank-out primary key "
132            "column 't2.id' on instance '<B",
133            sync.clear, b1, b_mapper, pairs
134        )
135
136    def test_clear_unmapped(self):
137        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
138        pairs = [(a_mapper.c.id, a_mapper.c.foo,)]
139        assert_raises_message(
140            orm_exc.UnmappedColumnError,
141            "Can't execute sync rule for destination "
142            r"column 't1.foo'; mapper 'Mapper\|B\|t2' does not "
143            "map this column.",
144            sync.clear, b1, b_mapper, pairs
145        )
146
147    def test_update(self):
148        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
149        a1.obj().id = 10
150        a1._commit_all(a1.dict)
151        a1.obj().id = 12
152        pairs = [(a_mapper.c.id, b_mapper.c.id,)]
153        dest = {}
154        sync.update(a1, a_mapper, dest, "old_", pairs)
155        eq_(dest, {'id': 12, 'old_id': 10})
156
157    def test_update_unmapped(self):
158        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
159        pairs = [(b_mapper.c.id, b_mapper.c.id,)]
160        dest = {}
161        assert_raises_message(
162            orm_exc.UnmappedColumnError,
163            "Can't execute sync rule for source column 't2.id'; "
164            r"mapper 'Mapper\|A\|t1' does not map this column.",
165            sync.update, a1, a_mapper, dest, "old_", pairs
166        )
167
168    def test_populate_dict(self):
169        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
170        a1.obj().id = 10
171        pairs = [(a_mapper.c.id, b_mapper.c.id,)]
172        dest = {}
173        sync.populate_dict(a1, a_mapper, dest, pairs)
174        eq_(dest, {'id':10})
175
176    def test_populate_dict_unmapped(self):
177        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
178        a1.obj().id = 10
179        pairs = [(b_mapper.c.id, b_mapper.c.id,)]
180        dest = {}
181        assert_raises_message(
182            orm_exc.UnmappedColumnError,
183            "Can't execute sync rule for source column 't2.id'; "
184            r"mapper 'Mapper\|A\|t1' does not map this column.",
185            sync.populate_dict, a1, a_mapper, dest, pairs
186        )
187
188    def test_source_modified_unmodified(self):
189        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
190        a1.obj().id = 10
191        pairs = [(a_mapper.c.id, b_mapper.c.id,)]
192        eq_(
193            sync.source_modified(uowcommit, a1, a_mapper, pairs),
194            False
195        )
196
197    def test_source_modified_no_pairs(self):
198        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
199        eq_(
200            sync.source_modified(uowcommit, a1, a_mapper, []),
201            False
202        )
203
204    def test_source_modified_modified(self):
205        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
206        a1.obj().id = 10
207        a1._commit_all(a1.dict)
208        a1.obj().id = 12
209        pairs = [(a_mapper.c.id, b_mapper.c.id,)]
210        eq_(
211            sync.source_modified(uowcommit, a1, a_mapper, pairs),
212            True
213        )
214
215    def test_source_modified_composite(self):
216        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
217        a1.obj().foo = 10
218        a1._commit_all(a1.dict)
219        a1.obj().foo = 12
220        pairs = [(a_mapper.c.id, b_mapper.c.id,),
221                (a_mapper.c.foo, b_mapper.c.id)]
222        eq_(
223            sync.source_modified(uowcommit, a1, a_mapper, pairs),
224            True
225        )
226
227    def test_source_modified_composite_unmodified(self):
228        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
229        a1.obj().foo = 10
230        a1._commit_all(a1.dict)
231        pairs = [(a_mapper.c.id, b_mapper.c.id,),
232                (a_mapper.c.foo, b_mapper.c.id)]
233        eq_(
234            sync.source_modified(uowcommit, a1, a_mapper, pairs),
235            False
236        )
237
238    def test_source_modified_no_unmapped(self):
239        uowcommit, a1, b1, a_mapper, b_mapper = self._fixture()
240        pairs = [(b_mapper.c.id, b_mapper.c.id,)]
241        assert_raises_message(
242            orm_exc.UnmappedColumnError,
243            "Can't execute sync rule for source column 't2.id'; "
244            r"mapper 'Mapper\|A\|t1' does not map this column.",
245            sync.source_modified, uowcommit, a1, a_mapper, pairs
246        )
247