1from sqlalchemy import ForeignKey 2from sqlalchemy import Integer 3from sqlalchemy import testing 4from sqlalchemy.orm import attributes 5from sqlalchemy.orm import class_mapper 6from sqlalchemy.orm import exc as orm_exc 7from sqlalchemy.orm import sync 8from sqlalchemy.orm import unitofwork 9from sqlalchemy.testing import assert_raises_message 10from sqlalchemy.testing import eq_ 11from sqlalchemy.testing import fixtures 12from sqlalchemy.testing.fixtures import fixture_session 13from sqlalchemy.testing.schema import Column 14from sqlalchemy.testing.schema import Table 15 16 17class AssertsUOW(object): 18 def _get_test_uow(self, session): 19 uow = unitofwork.UOWTransaction(session) 20 deleted = set(session._deleted) 21 new = set(session._new) 22 dirty = set(session._dirty_states).difference(deleted) 23 for s in new.union(dirty): 24 uow.register_object(s) 25 for d in deleted: 26 uow.register_object(d, isdelete=True) 27 return uow 28 29 30class SyncTest( 31 fixtures.MappedTest, testing.AssertsExecutionResults, AssertsUOW 32): 33 @classmethod 34 def define_tables(cls, metadata): 35 Table( 36 "t1", 37 metadata, 38 Column("id", Integer, primary_key=True), 39 Column("foo", Integer), 40 ) 41 Table( 42 "t2", 43 metadata, 44 Column("id", Integer, ForeignKey("t1.id"), primary_key=True), 45 Column("t1id", Integer, ForeignKey("t1.id")), 46 ) 47 48 @classmethod 49 def setup_classes(cls): 50 class A(cls.Basic): 51 pass 52 53 class B(cls.Basic): 54 pass 55 56 @classmethod 57 def setup_mappers(cls): 58 cls.mapper_registry.map_imperatively(cls.classes.A, cls.tables.t1) 59 cls.mapper_registry.map_imperatively(cls.classes.B, cls.tables.t2) 60 61 def _fixture(self): 62 A, B = self.classes.A, self.classes.B 63 session = fixture_session() 64 uowcommit = self._get_test_uow(session) 65 a_mapper = class_mapper(A) 66 b_mapper = class_mapper(B) 67 self.a1 = a1 = A() 68 self.b1 = b1 = B() 69 uowcommit = self._get_test_uow(session) 70 return ( 71 uowcommit, 72 attributes.instance_state(a1), 73 attributes.instance_state(b1), 74 a_mapper, 75 b_mapper, 76 ) 77 78 def test_populate(self): 79 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 80 pairs = [(a_mapper.c.id, b_mapper.c.id)] 81 a1.obj().id = 7 82 assert "id" not in b1.obj().__dict__ 83 sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, False) 84 eq_(b1.obj().id, 7) 85 eq_(b1.obj().__dict__["id"], 7) 86 assert ("pk_cascaded", b1, b_mapper.c.id) not in uowcommit.attributes 87 88 def test_populate_flag_cascaded(self): 89 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 90 pairs = [(a_mapper.c.id, b_mapper.c.id)] 91 a1.obj().id = 7 92 assert "id" not in b1.obj().__dict__ 93 sync.populate(a1, a_mapper, b1, b_mapper, pairs, uowcommit, True) 94 eq_(b1.obj().id, 7) 95 eq_(b1.obj().__dict__["id"], 7) 96 eq_(uowcommit.attributes[("pk_cascaded", b1, b_mapper.c.id)], True) 97 98 def test_populate_unmapped_source(self): 99 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 100 pairs = [(b_mapper.c.id, b_mapper.c.id)] 101 assert_raises_message( 102 orm_exc.UnmappedColumnError, 103 "Can't execute sync rule for source column 't2.id'; " 104 r"mapper 'mapped class A->t1' does not map this column.", 105 sync.populate, 106 a1, 107 a_mapper, 108 b1, 109 b_mapper, 110 pairs, 111 uowcommit, 112 False, 113 ) 114 115 def test_populate_unmapped_dest(self): 116 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 117 pairs = [(a_mapper.c.id, a_mapper.c.id)] 118 assert_raises_message( 119 orm_exc.UnmappedColumnError, 120 r"Can't execute sync rule for destination " 121 r"column 't1.id'; " 122 r"mapper 'mapped class B->t2' does not map this column.", 123 sync.populate, 124 a1, 125 a_mapper, 126 b1, 127 b_mapper, 128 pairs, 129 uowcommit, 130 False, 131 ) 132 133 def test_clear(self): 134 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 135 pairs = [(a_mapper.c.id, b_mapper.c.t1id)] 136 b1.obj().t1id = 8 137 eq_(b1.obj().__dict__["t1id"], 8) 138 sync.clear(b1, b_mapper, pairs) 139 eq_(b1.obj().__dict__["t1id"], None) 140 141 def test_clear_pk(self): 142 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 143 pairs = [(a_mapper.c.id, b_mapper.c.id)] 144 b1.obj().id = 8 145 eq_(b1.obj().__dict__["id"], 8) 146 assert_raises_message( 147 AssertionError, 148 "Dependency rule tried to blank-out primary key " 149 "column 't2.id' on instance '<B", 150 sync.clear, 151 b1, 152 b_mapper, 153 pairs, 154 ) 155 156 def test_clear_unmapped(self): 157 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 158 pairs = [(a_mapper.c.id, a_mapper.c.foo)] 159 assert_raises_message( 160 orm_exc.UnmappedColumnError, 161 "Can't execute sync rule for destination " 162 r"column 't1.foo'; mapper 'mapped class B->t2' does not " 163 "map this column.", 164 sync.clear, 165 b1, 166 b_mapper, 167 pairs, 168 ) 169 170 def test_update(self): 171 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 172 a1.obj().id = 10 173 a1._commit_all(a1.dict) 174 a1.obj().id = 12 175 pairs = [(a_mapper.c.id, b_mapper.c.id)] 176 dest = {} 177 sync.update(a1, a_mapper, dest, "old_", pairs) 178 eq_(dest, {"id": 12, "old_id": 10}) 179 180 def test_update_unmapped(self): 181 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 182 pairs = [(b_mapper.c.id, b_mapper.c.id)] 183 dest = {} 184 assert_raises_message( 185 orm_exc.UnmappedColumnError, 186 "Can't execute sync rule for source column 't2.id'; " 187 r"mapper 'mapped class A->t1' does not map this column.", 188 sync.update, 189 a1, 190 a_mapper, 191 dest, 192 "old_", 193 pairs, 194 ) 195 196 def test_populate_dict(self): 197 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 198 a1.obj().id = 10 199 pairs = [(a_mapper.c.id, b_mapper.c.id)] 200 dest = {} 201 sync.populate_dict(a1, a_mapper, dest, pairs) 202 eq_(dest, {"id": 10}) 203 204 def test_populate_dict_unmapped(self): 205 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 206 a1.obj().id = 10 207 pairs = [(b_mapper.c.id, b_mapper.c.id)] 208 dest = {} 209 assert_raises_message( 210 orm_exc.UnmappedColumnError, 211 "Can't execute sync rule for source column 't2.id'; " 212 r"mapper 'mapped class A->t1' does not map this column.", 213 sync.populate_dict, 214 a1, 215 a_mapper, 216 dest, 217 pairs, 218 ) 219 220 def test_source_modified_unmodified(self): 221 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 222 a1.obj().id = 10 223 pairs = [(a_mapper.c.id, b_mapper.c.id)] 224 eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), False) 225 226 def test_source_modified_no_pairs(self): 227 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 228 eq_(sync.source_modified(uowcommit, a1, a_mapper, []), False) 229 230 def test_source_modified_modified(self): 231 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 232 a1.obj().id = 10 233 a1._commit_all(a1.dict) 234 a1.obj().id = 12 235 pairs = [(a_mapper.c.id, b_mapper.c.id)] 236 eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), True) 237 238 def test_source_modified_composite(self): 239 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 240 a1.obj().foo = 10 241 a1._commit_all(a1.dict) 242 a1.obj().foo = 12 243 pairs = [ 244 (a_mapper.c.id, b_mapper.c.id), 245 (a_mapper.c.foo, b_mapper.c.id), 246 ] 247 eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), True) 248 249 def test_source_modified_composite_unmodified(self): 250 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 251 a1.obj().foo = 10 252 a1._commit_all(a1.dict) 253 pairs = [ 254 (a_mapper.c.id, b_mapper.c.id), 255 (a_mapper.c.foo, b_mapper.c.id), 256 ] 257 eq_(sync.source_modified(uowcommit, a1, a_mapper, pairs), False) 258 259 def test_source_modified_no_unmapped(self): 260 uowcommit, a1, b1, a_mapper, b_mapper = self._fixture() 261 pairs = [(b_mapper.c.id, b_mapper.c.id)] 262 assert_raises_message( 263 orm_exc.UnmappedColumnError, 264 "Can't execute sync rule for source column 't2.id'; " 265 r"mapper 'mapped class A->t1' does not map this column.", 266 sync.source_modified, 267 uowcommit, 268 a1, 269 a_mapper, 270 pairs, 271 ) 272