1from sqlalchemy.testing import eq_ 2from sqlalchemy.util import pickle 3import sqlalchemy as sa 4from sqlalchemy import testing 5from sqlalchemy.testing.util import picklers 6from sqlalchemy.testing import assert_raises_message 7from sqlalchemy import Integer, String, ForeignKey, exc, MetaData 8from sqlalchemy.testing.schema import Table, Column 9from sqlalchemy.orm import mapper, relationship, create_session, \ 10 sessionmaker, attributes, interfaces,\ 11 clear_mappers, exc as orm_exc,\ 12 configure_mappers, Session, lazyload_all,\ 13 lazyload, aliased 14from sqlalchemy.orm import state as sa_state 15from sqlalchemy.orm import instrumentation 16from sqlalchemy.orm.collections import attribute_mapped_collection, \ 17 column_mapped_collection 18from sqlalchemy.testing import fixtures 19from test.orm import _fixtures 20from sqlalchemy.testing.pickleable import User, Address, Dingaling, Order, \ 21 Child1, Child2, Parent, Screen, EmailUser 22 23 24class PickleTest(fixtures.MappedTest): 25 26 @classmethod 27 def define_tables(cls, metadata): 28 Table('users', metadata, 29 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 30 Column('name', String(30), nullable=False), 31 test_needs_acid=True, 32 test_needs_fk=True 33 ) 34 35 Table('addresses', metadata, 36 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 37 Column('user_id', None, ForeignKey('users.id')), 38 Column('email_address', String(50), nullable=False), 39 test_needs_acid=True, 40 test_needs_fk=True 41 ) 42 Table('orders', metadata, 43 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 44 Column('user_id', None, ForeignKey('users.id')), 45 Column('address_id', None, ForeignKey('addresses.id')), 46 Column('description', String(30)), 47 Column('isopen', Integer), 48 test_needs_acid=True, 49 test_needs_fk=True 50 ) 51 Table("dingalings", metadata, 52 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 53 Column('address_id', None, ForeignKey('addresses.id')), 54 Column('data', String(30)), 55 test_needs_acid=True, 56 test_needs_fk=True 57 ) 58 59 60 def test_transient(self): 61 users, addresses = (self.tables.users, 62 self.tables.addresses) 63 64 mapper(User, users, properties={ 65 'addresses':relationship(Address, backref="user") 66 }) 67 mapper(Address, addresses) 68 69 sess = create_session() 70 u1 = User(name='ed') 71 u1.addresses.append(Address(email_address='ed@bar.com')) 72 73 u2 = pickle.loads(pickle.dumps(u1)) 74 sess.add(u2) 75 sess.flush() 76 77 sess.expunge_all() 78 79 eq_(u1, sess.query(User).get(u2.id)) 80 81 def test_no_mappers(self): 82 users = self.tables.users 83 84 85 umapper = mapper(User, users) 86 u1 = User(name='ed') 87 u1_pickled = pickle.dumps(u1, -1) 88 89 clear_mappers() 90 91 assert_raises_message( 92 orm_exc.UnmappedInstanceError, 93 "Cannot deserialize object of type <class 'sqlalchemy.testing.pickleable.User'> - no mapper()", 94 pickle.loads, u1_pickled) 95 96 def test_no_instrumentation(self): 97 users = self.tables.users 98 99 100 umapper = mapper(User, users) 101 u1 = User(name='ed') 102 u1_pickled = pickle.dumps(u1, -1) 103 104 clear_mappers() 105 106 umapper = mapper(User, users) 107 108 u1 = pickle.loads(u1_pickled) 109 # this fails unless the InstanceState 110 # compiles the mapper 111 eq_(str(u1), "User(name='ed')") 112 113 114 def test_class_deferred_cols(self): 115 addresses, users = (self.tables.addresses, 116 self.tables.users) 117 118 mapper(User, users, properties={ 119 'name': sa.orm.deferred(users.c.name), 120 'addresses': relationship(Address, backref="user") 121 }) 122 mapper(Address, addresses, properties={ 123 'email_address': sa.orm.deferred(addresses.c.email_address) 124 }) 125 sess = create_session() 126 u1 = User(name='ed') 127 u1.addresses.append(Address(email_address='ed@bar.com')) 128 sess.add(u1) 129 sess.flush() 130 sess.expunge_all() 131 u1 = sess.query(User).get(u1.id) 132 assert 'name' not in u1.__dict__ 133 assert 'addresses' not in u1.__dict__ 134 135 u2 = pickle.loads(pickle.dumps(u1)) 136 sess2 = create_session() 137 sess2.add(u2) 138 eq_(u2.name, 'ed') 139 eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')])) 140 141 u2 = pickle.loads(pickle.dumps(u1)) 142 sess2 = create_session() 143 u2 = sess2.merge(u2, load=False) 144 eq_(u2.name, 'ed') 145 eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')])) 146 147 def test_instance_lazy_relation_loaders(self): 148 users, addresses = (self.tables.users, 149 self.tables.addresses) 150 151 mapper(User, users, properties={ 152 'addresses': relationship(Address, lazy='noload') 153 }) 154 mapper(Address, addresses) 155 156 sess = Session() 157 u1 = User(name='ed', addresses=[ 158 Address( 159 email_address='ed@bar.com', 160 ) 161 ]) 162 163 sess.add(u1) 164 sess.commit() 165 sess.close() 166 167 u1 = sess.query(User).options( 168 lazyload(User.addresses) 169 ).first() 170 u2 = pickle.loads(pickle.dumps(u1)) 171 172 sess = Session() 173 sess.add(u2) 174 assert u2.addresses 175 176 @testing.requires.non_broken_pickle 177 def test_instance_deferred_cols(self): 178 users, addresses = (self.tables.users, 179 self.tables.addresses) 180 181 mapper(User, users, properties={ 182 'addresses':relationship(Address, backref="user") 183 }) 184 mapper(Address, addresses) 185 186 sess = create_session() 187 u1 = User(name='ed') 188 u1.addresses.append(Address(email_address='ed@bar.com')) 189 sess.add(u1) 190 sess.flush() 191 sess.expunge_all() 192 193 u1 = sess.query(User).\ 194 options(sa.orm.defer('name'), 195 sa.orm.defer('addresses.email_address')).\ 196 get(u1.id) 197 assert 'name' not in u1.__dict__ 198 assert 'addresses' not in u1.__dict__ 199 200 u2 = pickle.loads(pickle.dumps(u1)) 201 sess2 = create_session() 202 sess2.add(u2) 203 eq_(u2.name, 'ed') 204 assert 'addresses' not in u2.__dict__ 205 ad = u2.addresses[0] 206 assert 'email_address' not in ad.__dict__ 207 eq_(ad.email_address, 'ed@bar.com') 208 eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')])) 209 210 u2 = pickle.loads(pickle.dumps(u1)) 211 sess2 = create_session() 212 u2 = sess2.merge(u2, load=False) 213 eq_(u2.name, 'ed') 214 assert 'addresses' not in u2.__dict__ 215 ad = u2.addresses[0] 216 217 # mapper options now transmit over merge(), 218 # new as of 0.6, so email_address is deferred. 219 assert 'email_address' not in ad.__dict__ 220 221 eq_(ad.email_address, 'ed@bar.com') 222 eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')])) 223 224 def test_pickle_protocols(self): 225 users, addresses = (self.tables.users, 226 self.tables.addresses) 227 228 mapper(User, users, properties={ 229 'addresses':relationship(Address, backref="user") 230 }) 231 mapper(Address, addresses) 232 233 sess = sessionmaker()() 234 u1 = User(name='ed') 235 u1.addresses.append(Address(email_address='ed@bar.com')) 236 sess.add(u1) 237 sess.commit() 238 239 u1 = sess.query(User).first() 240 u1.addresses 241 242 for loads, dumps in picklers(): 243 u2 = loads(dumps(u1)) 244 eq_(u1, u2) 245 246 def test_09_pickle(self): 247 users = self.tables.users 248 mapper(User, users) 249 sess = Session() 250 sess.add(User(id=1, name='ed')) 251 sess.commit() 252 sess.close() 253 254 inst = User(id=1, name='ed') 255 del inst._sa_instance_state 256 257 state = sa_state.InstanceState.__new__(sa_state.InstanceState) 258 state_09 = { 259 'class_': User, 260 'modified': False, 261 'committed_state': {}, 262 'instance': inst, 263 'callables': {'name': state, 'id': state}, 264 'key': (User, (1,)), 265 'expired': True} 266 manager = instrumentation._SerializeManager.__new__( 267 instrumentation._SerializeManager) 268 manager.class_ = User 269 state_09['manager'] = manager 270 state.__setstate__(state_09) 271 272 sess = Session() 273 sess.add(inst) 274 eq_(inst.name, 'ed') 275 276 @testing.requires.non_broken_pickle 277 def test_options_with_descriptors(self): 278 users, addresses, dingalings = (self.tables.users, 279 self.tables.addresses, 280 self.tables.dingalings) 281 282 mapper(User, users, properties={ 283 'addresses':relationship(Address, backref="user") 284 }) 285 mapper(Address, addresses, properties={ 286 'dingaling':relationship(Dingaling) 287 }) 288 mapper(Dingaling, dingalings) 289 sess = create_session() 290 u1 = User(name='ed') 291 u1.addresses.append(Address(email_address='ed@bar.com')) 292 sess.add(u1) 293 sess.flush() 294 sess.expunge_all() 295 296 for opt in [ 297 sa.orm.joinedload(User.addresses), 298 sa.orm.joinedload("addresses"), 299 sa.orm.defer("name"), 300 sa.orm.defer(User.name), 301 sa.orm.joinedload("addresses", Address.dingaling), 302 ]: 303 opt2 = pickle.loads(pickle.dumps(opt)) 304 eq_(opt.path, opt2.path) 305 306 u1 = sess.query(User).options(opt).first() 307 u2 = pickle.loads(pickle.dumps(u1)) 308 309 def test_collection_setstate(self): 310 """test a particular cycle that requires CollectionAdapter 311 to not rely upon InstanceState to deserialize.""" 312 313 m = MetaData() 314 c1 = Table('c1', m, 315 Column('parent_id', String, 316 ForeignKey('p.id'), primary_key=True) 317 ) 318 c2 = Table('c2', m, 319 Column('parent_id', String, 320 ForeignKey('p.id'), primary_key=True) 321 ) 322 p = Table('p', m, 323 Column('id', String, primary_key=True) 324 ) 325 326 mapper(Parent, p, properties={ 327 'children1':relationship(Child1), 328 'children2':relationship(Child2) 329 }) 330 mapper(Child1, c1) 331 mapper(Child2, c2) 332 333 obj = Parent() 334 screen1 = Screen(obj) 335 screen1.errors = [obj.children1, obj.children2] 336 screen2 = Screen(Child2(), screen1) 337 pickle.loads(pickle.dumps(screen2)) 338 339 def test_exceptions(self): 340 class Foo(object): 341 pass 342 users = self.tables.users 343 mapper(User, users) 344 345 for sa_exc in ( 346 orm_exc.UnmappedInstanceError(Foo()), 347 orm_exc.UnmappedClassError(Foo), 348 orm_exc.ObjectDeletedError(attributes.instance_state(User())), 349 ): 350 for loads, dumps in picklers(): 351 repickled = loads(dumps(sa_exc)) 352 eq_(repickled.args[0], sa_exc.args[0]) 353 354 def test_attribute_mapped_collection(self): 355 users, addresses = self.tables.users, self.tables.addresses 356 357 mapper(User, users, properties={ 358 'addresses':relationship( 359 Address, 360 collection_class= 361 attribute_mapped_collection('email_address') 362 ) 363 }) 364 mapper(Address, addresses) 365 u1 = User() 366 u1.addresses = {"email1":Address(email_address="email1")} 367 for loads, dumps in picklers(): 368 repickled = loads(dumps(u1)) 369 eq_(u1.addresses, repickled.addresses) 370 eq_(repickled.addresses['email1'], 371 Address(email_address="email1")) 372 373 def test_column_mapped_collection(self): 374 users, addresses = self.tables.users, self.tables.addresses 375 376 mapper(User, users, properties={ 377 'addresses':relationship( 378 Address, 379 collection_class= 380 column_mapped_collection( 381 addresses.c.email_address) 382 ) 383 }) 384 mapper(Address, addresses) 385 u1 = User() 386 u1.addresses = { 387 "email1":Address(email_address="email1"), 388 "email2":Address(email_address="email2") 389 } 390 for loads, dumps in picklers(): 391 repickled = loads(dumps(u1)) 392 eq_(u1.addresses, repickled.addresses) 393 eq_(repickled.addresses['email1'], 394 Address(email_address="email1")) 395 396 def test_composite_column_mapped_collection(self): 397 users, addresses = self.tables.users, self.tables.addresses 398 399 mapper(User, users, properties={ 400 'addresses':relationship( 401 Address, 402 collection_class= 403 column_mapped_collection([ 404 addresses.c.id, 405 addresses.c.email_address]) 406 ) 407 }) 408 mapper(Address, addresses) 409 u1 = User() 410 u1.addresses = { 411 (1, "email1"):Address(id=1, email_address="email1"), 412 (2, "email2"):Address(id=2, email_address="email2") 413 } 414 for loads, dumps in picklers(): 415 repickled = loads(dumps(u1)) 416 eq_(u1.addresses, repickled.addresses) 417 eq_(repickled.addresses[(1, 'email1')], 418 Address(id=1, email_address="email1")) 419 420class PolymorphicDeferredTest(fixtures.MappedTest): 421 @classmethod 422 def define_tables(cls, metadata): 423 Table('users', metadata, 424 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 425 Column('name', String(30)), 426 Column('type', String(30))) 427 Table('email_users', metadata, 428 Column('id', Integer, ForeignKey('users.id'), primary_key=True), 429 Column('email_address', String(30))) 430 431 432 def test_polymorphic_deferred(self): 433 email_users, users = (self.tables.email_users, 434 self.tables.users, 435 ) 436 437 mapper(User, users, polymorphic_identity='user', polymorphic_on=users.c.type) 438 mapper(EmailUser, email_users, inherits=User, polymorphic_identity='emailuser') 439 440 eu = EmailUser(name="user1", email_address='foo@bar.com') 441 sess = create_session() 442 sess.add(eu) 443 sess.flush() 444 sess.expunge_all() 445 446 eu = sess.query(User).first() 447 eu2 = pickle.loads(pickle.dumps(eu)) 448 sess2 = create_session() 449 sess2.add(eu2) 450 assert 'email_address' not in eu2.__dict__ 451 eq_(eu2.email_address, 'foo@bar.com') 452 453class TupleLabelTest(_fixtures.FixtureTest): 454 @classmethod 455 def setup_classes(cls): 456 pass 457 458 @classmethod 459 def setup_mappers(cls): 460 users, addresses, orders = cls.tables.users, cls.tables.addresses, cls.tables.orders 461 mapper(User, users, properties={ 462 'addresses':relationship(Address, backref='user', order_by=addresses.c.id), 463 'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o 464 }) 465 mapper(Address, addresses) 466 mapper(Order, orders, properties={ 467 'address':relationship(Address), # m2o 468 }) 469 470 def test_tuple_labeling(self): 471 users = self.tables.users 472 sess = create_session() 473 474 # test pickle + all the protocols ! 475 for pickled in False, -1, 0, 1, 2: 476 for row in sess.query(User, Address).join(User.addresses).all(): 477 if pickled is not False: 478 row = pickle.loads(pickle.dumps(row, pickled)) 479 480 eq_(list(row.keys()), ['User', 'Address']) 481 eq_(row.User, row[0]) 482 eq_(row.Address, row[1]) 483 484 for row in sess.query(User.name, User.id.label('foobar')): 485 if pickled is not False: 486 row = pickle.loads(pickle.dumps(row, pickled)) 487 eq_(list(row.keys()), ['name', 'foobar']) 488 eq_(row.name, row[0]) 489 eq_(row.foobar, row[1]) 490 491 for row in sess.query(User).values(User.name, User.id.label('foobar')): 492 if pickled is not False: 493 row = pickle.loads(pickle.dumps(row, pickled)) 494 eq_(list(row.keys()), ['name', 'foobar']) 495 eq_(row.name, row[0]) 496 eq_(row.foobar, row[1]) 497 498 oalias = aliased(Order) 499 for row in sess.query(User, oalias).join(User.orders).all(): 500 if pickled is not False: 501 row = pickle.loads(pickle.dumps(row, pickled)) 502 eq_(list(row.keys()), ['User']) 503 eq_(row.User, row[0]) 504 505 oalias = aliased(Order, name='orders') 506 for row in sess.query(User, oalias).join(oalias, User.orders).all(): 507 if pickled is not False: 508 row = pickle.loads(pickle.dumps(row, pickled)) 509 eq_(list(row.keys()), ['User', 'orders']) 510 eq_(row.User, row[0]) 511 eq_(row.orders, row[1]) 512 513 # test here that first col is not labeled, only 514 # one name in keys, matches correctly 515 for row in sess.query(User.name + 'hoho', User.name): 516 eq_(list(row.keys()), ['name']) 517 eq_(row[0], row.name + 'hoho') 518 519 if pickled is not False: 520 ret = sess.query(User, Address).join(User.addresses).all() 521 pickle.loads(pickle.dumps(ret, pickled)) 522 523class CustomSetupTeardownTest(fixtures.MappedTest): 524 @classmethod 525 def define_tables(cls, metadata): 526 Table('users', metadata, 527 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 528 Column('name', String(30), nullable=False), 529 test_needs_acid=True, 530 test_needs_fk=True 531 ) 532 533 Table('addresses', metadata, 534 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 535 Column('user_id', None, ForeignKey('users.id')), 536 Column('email_address', String(50), nullable=False), 537 test_needs_acid=True, 538 test_needs_fk=True 539 ) 540 def test_rebuild_state(self): 541 """not much of a 'test', but illustrate how to 542 remove instance-level state before pickling. 543 544 """ 545 546 users = self.tables.users 547 548 mapper(User, users) 549 550 u1 = User() 551 attributes.manager_of_class(User).teardown_instance(u1) 552 assert not u1.__dict__ 553 u2 = pickle.loads(pickle.dumps(u1)) 554 attributes.manager_of_class(User).setup_instance(u2) 555 assert attributes.instance_state(u2) 556 557