1from sqlalchemy.testing import eq_, assert_raises, assert_raises_message 2from sqlalchemy import testing 3from sqlalchemy.testing.schema import Table, Column 4from test.orm import _fixtures 5from sqlalchemy.testing import fixtures 6from sqlalchemy import Integer, String, ForeignKey, func 7from sqlalchemy.orm import mapper, relationship, backref, \ 8 create_session, unitofwork, attributes,\ 9 Session, class_mapper, sync, exc as orm_exc 10 11 12class AssertsUOW(object): 13 def _get_test_uow(self, session): 14 uow = unitofwork.UOWTransaction(session) 15 deleted = set(session._deleted) 16 new = set(session._new) 17 dirty = set(session._dirty_states).difference(deleted) 18 for s in new.union(dirty): 19 uow.register_object(s) 20 for d in deleted: 21 uow.register_object(d, isdelete=True) 22 return uow 23 24class SyncTest(fixtures.MappedTest, 25 testing.AssertsExecutionResults, AssertsUOW): 26 27 @classmethod 28 def define_tables(cls, metadata): 29 Table('t1', metadata, 30 Column('id', Integer, primary_key=True), 31 Column('foo', Integer) 32 ) 33 Table('t2', metadata, 34 Column('id', Integer, ForeignKey('t1.id'), primary_key=True), 35 Column('t1id', Integer, ForeignKey('t1.id')), 36 ) 37 38 @classmethod 39 def setup_classes(cls): 40 class A(cls.Basic): 41 pass 42 class B(cls.Basic): 43 pass 44 45 @classmethod 46 def setup_mappers(cls): 47 mapper(cls.classes.A, cls.tables.t1) 48 mapper(cls.classes.B, cls.tables.t2) 49 50 def _fixture(self): 51 A, B = self.classes.A, self.classes.B 52 session = create_session() 53 uowcommit = self._get_test_uow(session) 54 a_mapper = class_mapper(A) 55 b_mapper= class_mapper(B) 56 self.a1 = a1 = A() 57 self.b1 = b1 = B() 58 uowcommit = self._get_test_uow(session) 59 return uowcommit,\ 60 attributes.instance_state(a1),\ 61 attributes.instance_state(b1),\ 62 a_mapper, b_mapper 63 64 def test_populate(self): 65 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 66 pairs = [(a_mapper.c.id, b_mapper.c.id)] 67 a1.obj().id = 7 68 assert 'id' not in b1.obj().__dict__ 69 sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, False) 70 eq_(b1.obj().id, 7) 71 eq_(b1.obj().__dict__['id'], 7) 72 assert ("pk_cascaded", b1, b_mapper.c.id) not in uowcommit.attributes 73 74 def test_populate_flag_cascaded(self): 75 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 76 pairs = [(a_mapper.c.id, b_mapper.c.id)] 77 a1.obj().id = 7 78 assert 'id' not in b1.obj().__dict__ 79 sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, True) 80 eq_(b1.obj().id, 7) 81 eq_(b1.obj().__dict__['id'], 7) 82 eq_(uowcommit.attributes[("pk_cascaded", b1, b_mapper.c.id)], True) 83 84 def test_populate_unmapped_source(self): 85 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 86 pairs = [(b_mapper.c.id, b_mapper.c.id)] 87 assert_raises_message( 88 orm_exc.UnmappedColumnError, 89 "Can't execute sync rule for source column 't2.id'; " 90 r"mapper 'Mapper\|A\|t1' does not map this column.", 91 sync.populate, 92 a1, 93 a_mapper, 94 b1, 95 b_mapper, 96 pairs, 97 uowcommit, False 98 ) 99 100 def test_populate_unmapped_dest(self): 101 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 102 pairs = [(a_mapper.c.id, a_mapper.c.id,)] 103 assert_raises_message( 104 orm_exc.UnmappedColumnError, 105 "Can't execute sync rule for destination " 106 r"column 't1.id'; mapper 'Mapper\|B\|t2' does not map this column.", 107 sync.populate, 108 a1, 109 a_mapper, 110 b1, 111 b_mapper, 112 pairs, 113 uowcommit, False 114 ) 115 116 def test_clear(self): 117 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 118 pairs = [(a_mapper.c.id, b_mapper.c.t1id,)] 119 b1.obj().t1id = 8 120 eq_(b1.obj().__dict__['t1id'], 8) 121 sync.clear(b1, b_mapper, pairs) 122 eq_(b1.obj().__dict__['t1id'], None) 123 124 def test_clear_pk(self): 125 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 126 pairs = [(a_mapper.c.id, b_mapper.c.id,)] 127 b1.obj().id = 8 128 eq_(b1.obj().__dict__['id'], 8) 129 assert_raises_message( 130 AssertionError, 131 "Dependency rule tried to blank-out primary key " 132 "column 't2.id' on instance '<B", 133 sync.clear, b1, b_mapper, pairs 134 ) 135 136 def test_clear_unmapped(self): 137 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 138 pairs = [(a_mapper.c.id, a_mapper.c.foo,)] 139 assert_raises_message( 140 orm_exc.UnmappedColumnError, 141 "Can't execute sync rule for destination " 142 r"column 't1.foo'; mapper 'Mapper\|B\|t2' does not " 143 "map this column.", 144 sync.clear, b1, b_mapper, pairs 145 ) 146 147 def test_update(self): 148 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 149 a1.obj().id = 10 150 a1._commit_all(a1.dict) 151 a1.obj().id = 12 152 pairs = [(a_mapper.c.id, b_mapper.c.id,)] 153 dest = {} 154 sync.update(a1, a_mapper, dest, "old_", pairs) 155 eq_(dest, {'id': 12, 'old_id': 10}) 156 157 def test_update_unmapped(self): 158 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 159 pairs = [(b_mapper.c.id, b_mapper.c.id,)] 160 dest = {} 161 assert_raises_message( 162 orm_exc.UnmappedColumnError, 163 "Can't execute sync rule for source column 't2.id'; " 164 r"mapper 'Mapper\|A\|t1' does not map this column.", 165 sync.update, a1, a_mapper, dest, "old_", pairs 166 ) 167 168 def test_populate_dict(self): 169 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 170 a1.obj().id = 10 171 pairs = [(a_mapper.c.id, b_mapper.c.id,)] 172 dest = {} 173 sync.populate_dict(a1, a_mapper, dest, pairs) 174 eq_(dest, {'id':10}) 175 176 def test_populate_dict_unmapped(self): 177 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 178 a1.obj().id = 10 179 pairs = [(b_mapper.c.id, b_mapper.c.id,)] 180 dest = {} 181 assert_raises_message( 182 orm_exc.UnmappedColumnError, 183 "Can't execute sync rule for source column 't2.id'; " 184 r"mapper 'Mapper\|A\|t1' does not map this column.", 185 sync.populate_dict, a1, a_mapper, dest, pairs 186 ) 187 188 def test_source_modified_unmodified(self): 189 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 190 a1.obj().id = 10 191 pairs = [(a_mapper.c.id, b_mapper.c.id,)] 192 eq_( 193 sync.source_modified(uowcommit, a1, a_mapper, pairs), 194 False 195 ) 196 197 def test_source_modified_no_pairs(self): 198 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 199 eq_( 200 sync.source_modified(uowcommit, a1, a_mapper, []), 201 False 202 ) 203 204 def test_source_modified_modified(self): 205 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 206 a1.obj().id = 10 207 a1._commit_all(a1.dict) 208 a1.obj().id = 12 209 pairs = [(a_mapper.c.id, b_mapper.c.id,)] 210 eq_( 211 sync.source_modified(uowcommit, a1, a_mapper, pairs), 212 True 213 ) 214 215 def test_source_modified_composite(self): 216 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 217 a1.obj().foo = 10 218 a1._commit_all(a1.dict) 219 a1.obj().foo = 12 220 pairs = [(a_mapper.c.id, b_mapper.c.id,), 221 (a_mapper.c.foo, b_mapper.c.id)] 222 eq_( 223 sync.source_modified(uowcommit, a1, a_mapper, pairs), 224 True 225 ) 226 227 def test_source_modified_composite_unmodified(self): 228 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 229 a1.obj().foo = 10 230 a1._commit_all(a1.dict) 231 pairs = [(a_mapper.c.id, b_mapper.c.id,), 232 (a_mapper.c.foo, b_mapper.c.id)] 233 eq_( 234 sync.source_modified(uowcommit, a1, a_mapper, pairs), 235 False 236 ) 237 238 def test_source_modified_no_unmapped(self): 239 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 240 pairs = [(b_mapper.c.id, b_mapper.c.id,)] 241 assert_raises_message( 242 orm_exc.UnmappedColumnError, 243 "Can't execute sync rule for source column 't2.id'; " 244 r"mapper 'Mapper\|A\|t1' does not map this column.", 245 sync.source_modified, uowcommit, a1, a_mapper, pairs 246 ) 247