1"""General mapper operations with an emphasis on selecting/loading.""" 2 3from sqlalchemy.testing import assert_raises, assert_raises_message 4import sqlalchemy as sa 5from sqlalchemy import testing 6from sqlalchemy import MetaData, Integer, String, \ 7 ForeignKey, func, util, select 8from sqlalchemy.testing.schema import Table, Column 9from sqlalchemy.engine import default 10from sqlalchemy.orm import mapper, relationship, backref, \ 11 create_session, class_mapper, configure_mappers, reconstructor, \ 12 aliased, deferred, synonym, attributes, \ 13 column_property, composite, dynamic_loader, \ 14 comparable_property, Session 15from sqlalchemy.orm.persistence import _sort_states 16from sqlalchemy.testing import eq_, AssertsCompiledSQL, is_ 17from sqlalchemy.testing import fixtures 18from test.orm import _fixtures 19from sqlalchemy.testing.assertsql import CompiledSQL 20import logging 21import logging.handlers 22 23 24class MapperTest(_fixtures.FixtureTest, AssertsCompiledSQL): 25 __dialect__ = 'default' 26 27 def test_prop_shadow(self): 28 """A backref name may not shadow an existing property name.""" 29 30 Address, addresses, users, User = (self.classes.Address, 31 self.tables.addresses, 32 self.tables.users, 33 self.classes.User) 34 35 mapper(Address, addresses) 36 mapper(User, users, 37 properties={ 38 'addresses': relationship(Address, backref='email_address') 39 }) 40 assert_raises(sa.exc.ArgumentError, sa.orm.configure_mappers) 41 42 def test_update_attr_keys(self): 43 """test that update()/insert() use the correct key when given 44 InstrumentedAttributes.""" 45 46 User, users = self.classes.User, self.tables.users 47 48 mapper(User, users, properties={ 49 'foobar': users.c.name 50 }) 51 52 users.insert().values({User.foobar: 'name1'}).execute() 53 eq_(sa.select([User.foobar]).where(User.foobar == 'name1'). 54 execute().fetchall(), [('name1',)]) 55 56 users.update().values({User.foobar: User.foobar + 'foo'}).execute() 57 eq_(sa.select([User.foobar]).where(User.foobar == 'name1foo'). 58 execute().fetchall(), [('name1foo',)]) 59 60 def test_utils(self): 61 users = self.tables.users 62 addresses = self.tables.addresses 63 Address = self.classes.Address 64 65 from sqlalchemy.orm.base import _is_mapped_class, _is_aliased_class 66 67 class Foo(object): 68 x = "something" 69 70 @property 71 def y(self): 72 return "something else" 73 74 m = mapper(Foo, users, properties={"addresses": relationship(Address)}) 75 mapper(Address, addresses) 76 a1 = aliased(Foo) 77 78 f = Foo() 79 80 for fn, arg, ret in [ 81 (_is_mapped_class, Foo.x, False), 82 (_is_mapped_class, Foo.y, False), 83 (_is_mapped_class, Foo.name, False), 84 (_is_mapped_class, Foo.addresses, False), 85 (_is_mapped_class, Foo, True), 86 (_is_mapped_class, f, False), 87 (_is_mapped_class, a1, True), 88 (_is_mapped_class, m, True), 89 (_is_aliased_class, a1, True), 90 (_is_aliased_class, Foo.x, False), 91 (_is_aliased_class, Foo.y, False), 92 (_is_aliased_class, Foo, False), 93 (_is_aliased_class, f, False), 94 (_is_aliased_class, a1, True), 95 (_is_aliased_class, m, False), 96 ]: 97 assert fn(arg) == ret 98 99 def test_entity_descriptor(self): 100 users = self.tables.users 101 102 from sqlalchemy.orm.base import _entity_descriptor 103 104 class Foo(object): 105 x = "something" 106 107 @property 108 def y(self): 109 return "something else" 110 m = mapper(Foo, users) 111 a1 = aliased(Foo) 112 113 for arg, key, ret in [ 114 (m, "x", Foo.x), 115 (Foo, "x", Foo.x), 116 (a1, "x", a1.x), 117 (users, "name", users.c.name) 118 ]: 119 assert _entity_descriptor(arg, key) is ret 120 121 def test_friendly_attribute_str_on_uncompiled_boom(self): 122 User, users = self.classes.User, self.tables.users 123 124 def boom(): 125 raise Exception("it broke") 126 mapper(User, users, properties={ 127 'addresses': relationship(boom) 128 }) 129 130 # test that QueryableAttribute.__str__() doesn't 131 # cause a compile. 132 eq_(str(User.addresses), "User.addresses") 133 134 def test_exceptions_sticky(self): 135 """test preservation of mapper compile errors raised during hasattr(), 136 as well as for redundant mapper compile calls. Test that 137 repeated calls don't stack up error messages. 138 139 """ 140 141 Address, addresses, User = (self.classes.Address, 142 self.tables.addresses, 143 self.classes.User) 144 145 mapper(Address, addresses, properties={ 146 'user': relationship(User) 147 }) 148 149 try: 150 hasattr(Address.user, 'property') 151 except sa.orm.exc.UnmappedClassError: 152 assert util.compat.py32 153 154 for i in range(3): 155 assert_raises_message(sa.exc.InvalidRequestError, 156 "^One or more " 157 "mappers failed to initialize - can't " 158 "proceed with initialization of other " 159 r"mappers. Triggering mapper\: " 160 r"'Mapper\|Address\|addresses'." 161 " Original exception was: Class " 162 "'test.orm._fixtures.User' is not mapped$", 163 configure_mappers) 164 165 def test_column_prefix(self): 166 users, User = self.tables.users, self.classes.User 167 168 mapper(User, users, column_prefix='_', properties={ 169 'user_name': synonym('_name') 170 }) 171 172 s = create_session() 173 u = s.query(User).get(7) 174 eq_(u._name, 'jack') 175 eq_(u._id, 7) 176 u2 = s.query(User).filter_by(user_name='jack').one() 177 assert u is u2 178 179 def test_no_pks_1(self): 180 User, users = self.classes.User, self.tables.users 181 182 s = sa.select([users.c.name]).alias('foo') 183 assert_raises(sa.exc.ArgumentError, mapper, User, s) 184 185 def test_no_pks_2(self): 186 User, users = self.classes.User, self.tables.users 187 188 s = sa.select([users.c.name]).alias() 189 assert_raises(sa.exc.ArgumentError, mapper, User, s) 190 191 def test_reconfigure_on_other_mapper(self): 192 """A configure trigger on an already-configured mapper 193 still triggers a check against all mappers.""" 194 195 users, Address, addresses, User = (self.tables.users, 196 self.classes.Address, 197 self.tables.addresses, 198 self.classes.User) 199 200 mapper(User, users) 201 sa.orm.configure_mappers() 202 assert sa.orm.mapperlib.Mapper._new_mappers is False 203 204 m = mapper(Address, addresses, properties={ 205 'user': relationship(User, backref="addresses")}) 206 207 assert m.configured is False 208 assert sa.orm.mapperlib.Mapper._new_mappers is True 209 u = User() 210 assert User.addresses 211 assert sa.orm.mapperlib.Mapper._new_mappers is False 212 213 def test_configure_on_session(self): 214 User, users = self.classes.User, self.tables.users 215 216 m = mapper(User, users) 217 session = create_session() 218 session.connection(m) 219 220 def test_incomplete_columns(self): 221 """Loading from a select which does not contain all columns""" 222 223 addresses, Address = self.tables.addresses, self.classes.Address 224 225 mapper(Address, addresses) 226 s = create_session() 227 a = s.query(Address).from_statement( 228 sa.select([addresses.c.id, addresses.c.user_id]). 229 order_by(addresses.c.id)).first() 230 eq_(a.user_id, 7) 231 eq_(a.id, 1) 232 # email address auto-defers 233 assert 'email_addres' not in a.__dict__ 234 eq_(a.email_address, 'jack@bean.com') 235 236 def test_column_not_present(self): 237 users, addresses, User = (self.tables.users, 238 self.tables.addresses, 239 self.classes.User) 240 241 assert_raises_message(sa.exc.ArgumentError, 242 "not represented in the mapper's table", 243 mapper, User, users, 244 properties={'foo': addresses.c.user_id}) 245 246 def test_constructor_exc(self): 247 """TypeError is raised for illegal constructor args, 248 whether or not explicit __init__ is present [ticket:908].""" 249 250 users, addresses = self.tables.users, self.tables.addresses 251 252 class Foo(object): 253 254 def __init__(self): 255 pass 256 257 class Bar(object): 258 pass 259 260 mapper(Foo, users) 261 mapper(Bar, addresses) 262 assert_raises(TypeError, Foo, x=5) 263 assert_raises(TypeError, Bar, x=5) 264 265 def test_sort_states_comparisons(self): 266 """test that _sort_states() doesn't compare 267 insert_order to state.key, for set of mixed 268 persistent/pending. In particular Python 3 disallows 269 this. 270 271 """ 272 class Foo(object): 273 274 def __init__(self, id): 275 self.id = id 276 m = MetaData() 277 foo_t = Table('foo', m, 278 Column('id', String, primary_key=True) 279 ) 280 m = mapper(Foo, foo_t) 281 282 class DontCompareMeToString(int): 283 if util.py2k: 284 def __lt__(self, other): 285 assert not isinstance(other, basestring) 286 return int(self) < other 287 288 foos = [Foo(id='f%d' % i) for i in range(5)] 289 states = [attributes.instance_state(f) for f in foos] 290 291 for s in states[0:3]: 292 s.key = m._identity_key_from_state(s) 293 states[3].insert_order = DontCompareMeToString(5) 294 states[4].insert_order = DontCompareMeToString(1) 295 states[2].insert_order = DontCompareMeToString(3) 296 eq_( 297 _sort_states(states), 298 [states[4], states[3], states[0], states[1], states[2]] 299 ) 300 301 def test_props(self): 302 users, Address, addresses, User = (self.tables.users, 303 self.classes.Address, 304 self.tables.addresses, 305 self.classes.User) 306 307 m = mapper(User, users, properties={ 308 'addresses': relationship(mapper(Address, addresses)) 309 }) 310 assert User.addresses.property is m.get_property('addresses') 311 312 def test_unicode_relationship_backref_names(self): 313 # test [ticket:2901] 314 users, Address, addresses, User = (self.tables.users, 315 self.classes.Address, 316 self.tables.addresses, 317 self.classes.User) 318 319 mapper(Address, addresses) 320 mapper(User, users, properties={ 321 util.u('addresses'): relationship(Address, backref=util.u('user')) 322 }) 323 u1 = User() 324 a1 = Address() 325 u1.addresses.append(a1) 326 assert a1.user is u1 327 328 def test_configure_on_prop_1(self): 329 users, Address, addresses, User = (self.tables.users, 330 self.classes.Address, 331 self.tables.addresses, 332 self.classes.User) 333 334 mapper(User, users, properties={ 335 'addresses': relationship(mapper(Address, addresses)) 336 }) 337 User.addresses.any(Address.email_address == 'foo@bar.com') 338 339 def test_configure_on_prop_2(self): 340 users, Address, addresses, User = (self.tables.users, 341 self.classes.Address, 342 self.tables.addresses, 343 self.classes.User) 344 345 mapper(User, users, properties={ 346 'addresses': relationship(mapper(Address, addresses)) 347 }) 348 eq_(str(User.id == 3), str(users.c.id == 3)) 349 350 def test_configure_on_prop_3(self): 351 users, addresses, User = (self.tables.users, 352 self.tables.addresses, 353 self.classes.User) 354 355 class Foo(User): 356 pass 357 358 mapper(User, users) 359 mapper(Foo, addresses, inherits=User, properties={ 360 'address_id': addresses.c.id 361 }) 362 assert getattr(Foo().__class__, 'name').impl is not None 363 364 def test_deferred_subclass_attribute_instrument(self): 365 users, addresses, User = (self.tables.users, 366 self.tables.addresses, 367 self.classes.User) 368 369 class Foo(User): 370 pass 371 372 mapper(User, users) 373 configure_mappers() 374 mapper(Foo, addresses, inherits=User, properties={ 375 'address_id': addresses.c.id 376 }) 377 assert getattr(Foo().__class__, 'name').impl is not None 378 379 def test_class_hier_only_instrument_once_multiple_configure(self): 380 users, addresses = (self.tables.users, self.tables.addresses) 381 382 class A(object): 383 pass 384 385 class ASub(A): 386 pass 387 388 class ASubSub(ASub): 389 pass 390 391 class B(object): 392 pass 393 394 from sqlalchemy.testing import mock 395 from sqlalchemy.orm.attributes import register_attribute_impl 396 397 with mock.patch( 398 "sqlalchemy.orm.attributes.register_attribute_impl", 399 side_effect=register_attribute_impl 400 ) as some_mock: 401 402 mapper(A, users, properties={ 403 'bs': relationship(B) 404 }) 405 mapper(B, addresses) 406 407 configure_mappers() 408 409 mapper(ASub, inherits=A) 410 mapper(ASubSub, inherits=ASub) 411 412 configure_mappers() 413 414 b_calls = [ 415 c for c in some_mock.mock_calls if c[1][1] == 'bs' 416 ] 417 eq_(len(b_calls), 3) 418 419 def test_check_descriptor_as_method(self): 420 User, users = self.classes.User, self.tables.users 421 422 m = mapper(User, users) 423 424 class MyClass(User): 425 426 def foo(self): 427 pass 428 m._is_userland_descriptor(MyClass.foo) 429 430 def test_configure_on_get_props_1(self): 431 User, users = self.classes.User, self.tables.users 432 433 m = mapper(User, users) 434 assert not m.configured 435 assert list(m.iterate_properties) 436 assert m.configured 437 438 def test_configure_on_get_props_2(self): 439 User, users = self.classes.User, self.tables.users 440 441 m = mapper(User, users) 442 assert not m.configured 443 assert m.get_property('name') 444 assert m.configured 445 446 def test_configure_on_get_props_3(self): 447 users, Address, addresses, User = (self.tables.users, 448 self.classes.Address, 449 self.tables.addresses, 450 self.classes.User) 451 452 m = mapper(User, users) 453 assert not m.configured 454 configure_mappers() 455 456 m2 = mapper(Address, addresses, properties={ 457 'user': relationship(User, backref='addresses') 458 }) 459 assert m.get_property('addresses') 460 461 def test_info(self): 462 users = self.tables.users 463 Address = self.classes.Address 464 465 class MyComposite(object): 466 pass 467 for constructor, args in [ 468 (column_property, (users.c.name,)), 469 (relationship, (Address,)), 470 (composite, (MyComposite, 'id', 'name')), 471 (synonym, 'foo'), 472 (comparable_property, 'foo') 473 ]: 474 obj = constructor(info={"x": "y"}, *args) 475 eq_(obj.info, {"x": "y"}) 476 obj.info["q"] = "p" 477 eq_(obj.info, {"x": "y", "q": "p"}) 478 479 obj = constructor(*args) 480 eq_(obj.info, {}) 481 obj.info["q"] = "p" 482 eq_(obj.info, {"q": "p"}) 483 484 def test_info_via_instrumented(self): 485 m = MetaData() 486 # create specific tables here as we don't want 487 # users.c.id.info to be pre-initialized 488 users = Table('u', m, Column('id', Integer, primary_key=True), 489 Column('name', String)) 490 addresses = Table('a', m, Column('id', Integer, primary_key=True), 491 Column('name', String), 492 Column('user_id', Integer, ForeignKey('u.id'))) 493 Address = self.classes.Address 494 User = self.classes.User 495 496 mapper(User, users, properties={ 497 "name_lower": column_property(func.lower(users.c.name)), 498 "addresses": relationship(Address) 499 }) 500 mapper(Address, addresses) 501 502 # attr.info goes down to the original Column object 503 # for the dictionary. The annotated element needs to pass 504 # this on. 505 assert 'info' not in users.c.id.__dict__ 506 is_(User.id.info, users.c.id.info) 507 assert 'info' in users.c.id.__dict__ 508 509 # for SQL expressions, ORM-level .info 510 is_(User.name_lower.info, User.name_lower.property.info) 511 512 # same for relationships 513 is_(User.addresses.info, User.addresses.property.info) 514 515 def test_add_property(self): 516 users, addresses, Address = (self.tables.users, 517 self.tables.addresses, 518 self.classes.Address) 519 520 assert_col = [] 521 522 class User(fixtures.ComparableEntity): 523 524 def _get_name(self): 525 assert_col.append(('get', self._name)) 526 return self._name 527 528 def _set_name(self, name): 529 assert_col.append(('set', name)) 530 self._name = name 531 name = property(_get_name, _set_name) 532 533 def _uc_name(self): 534 if self._name is None: 535 return None 536 return self._name.upper() 537 uc_name = property(_uc_name) 538 uc_name2 = property(_uc_name) 539 540 m = mapper(User, users) 541 mapper(Address, addresses) 542 543 class UCComparator(sa.orm.PropComparator): 544 __hash__ = None 545 546 def __eq__(self, other): 547 cls = self.prop.parent.class_ 548 col = getattr(cls, 'name') 549 if other is None: 550 return col is None 551 else: 552 return sa.func.upper(col) == sa.func.upper(other) 553 554 m.add_property('_name', deferred(users.c.name)) 555 m.add_property('name', synonym('_name')) 556 m.add_property('addresses', relationship(Address)) 557 m.add_property('uc_name', sa.orm.comparable_property(UCComparator)) 558 m.add_property('uc_name2', sa.orm.comparable_property( 559 UCComparator, User.uc_name2)) 560 561 sess = create_session(autocommit=False) 562 assert sess.query(User).get(7) 563 564 u = sess.query(User).filter_by(name='jack').one() 565 566 def go(): 567 eq_(len(u.addresses), 568 len(self.static.user_address_result[0].addresses)) 569 eq_(u.name, 'jack') 570 eq_(u.uc_name, 'JACK') 571 eq_(u.uc_name2, 'JACK') 572 eq_(assert_col, [('get', 'jack')], str(assert_col)) 573 self.sql_count_(2, go) 574 575 u.name = 'ed' 576 u3 = User() 577 u3.name = 'some user' 578 sess.add(u3) 579 sess.flush() 580 sess.rollback() 581 582 def test_add_prop_via_backref_resets_memoizations_reconfigures(self): 583 users, User = self.tables.users, self.classes.User 584 addresses, Address = self.tables.addresses, self.classes.Address 585 586 m1 = mapper(User, users) 587 User() 588 589 m2 = mapper(Address, addresses, properties={ 590 'user': relationship(User, backref="addresses") 591 }) 592 # configure mappers takes place when User is generated 593 User() 594 assert hasattr(User, 'addresses') 595 assert "addresses" in [p.key for p in m1._polymorphic_properties] 596 597 def test_replace_col_prop_w_syn(self): 598 users, User = self.tables.users, self.classes.User 599 600 m = mapper(User, users) 601 m.add_property('_name', users.c.name) 602 m.add_property('name', synonym('_name')) 603 604 sess = create_session() 605 u = sess.query(User).filter_by(name='jack').one() 606 eq_(u._name, 'jack') 607 eq_(u.name, 'jack') 608 u.name = 'jacko' 609 assert m._columntoproperty[users.c.name] is m.get_property('_name') 610 611 sa.orm.clear_mappers() 612 613 m = mapper(User, users) 614 m.add_property('name', synonym('_name', map_column=True)) 615 616 sess.expunge_all() 617 u = sess.query(User).filter_by(name='jack').one() 618 eq_(u._name, 'jack') 619 eq_(u.name, 'jack') 620 u.name = 'jacko' 621 assert m._columntoproperty[users.c.name] is m.get_property('_name') 622 623 def test_replace_rel_prop_with_rel_warns(self): 624 users, User = self.tables.users, self.classes.User 625 addresses, Address = self.tables.addresses, self.classes.Address 626 627 m = mapper(User, users, properties={ 628 "addresses": relationship(Address) 629 }) 630 mapper(Address, addresses) 631 632 assert_raises_message( 633 sa.exc.SAWarning, 634 "Property User.addresses on Mapper|User|users being replaced " 635 "with new property User.addresses; the old property will " 636 "be discarded", 637 m.add_property, 638 "addresses", relationship(Address) 639 ) 640 641 def test_add_column_prop_deannotate(self): 642 User, users = self.classes.User, self.tables.users 643 Address, addresses = self.classes.Address, self.tables.addresses 644 645 class SubUser(User): 646 pass 647 m = mapper(User, users) 648 m2 = mapper(SubUser, addresses, inherits=User, properties={ 649 'address_id': addresses.c.id 650 }) 651 m3 = mapper(Address, addresses, properties={ 652 'foo': relationship(m2) 653 }) 654 # add property using annotated User.name, 655 # needs to be deannotated 656 m.add_property("x", column_property(User.name + "name")) 657 s = create_session() 658 q = s.query(m2).select_from(Address).join(Address.foo) 659 self.assert_compile( 660 q, 661 "SELECT " 662 "addresses_1.id AS addresses_1_id, " 663 "users_1.id AS users_1_id, " 664 "users_1.name AS users_1_name, " 665 "addresses_1.user_id AS addresses_1_user_id, " 666 "addresses_1.email_address AS " 667 "addresses_1_email_address, " 668 "users_1.name || :name_1 AS anon_1 " 669 "FROM addresses JOIN (users AS users_1 JOIN addresses " 670 "AS addresses_1 ON users_1.id = " 671 "addresses_1.user_id) ON " 672 "users_1.id = addresses.user_id" 673 ) 674 675 def test_column_prop_deannotate(self): 676 """test that column property deannotates, 677 bringing expressions down to the original mapped columns. 678 """ 679 User, users = self.classes.User, self.tables.users 680 m = mapper(User, users) 681 assert User.id.property.columns[0] is users.c.id 682 assert User.name.property.columns[0] is users.c.name 683 expr = User.name + "name" 684 expr2 = sa.select([User.name, users.c.id]) 685 m.add_property("x", column_property(expr)) 686 m.add_property("y", column_property(expr2)) 687 688 assert User.x.property.columns[0] is not expr 689 assert User.x.property.columns[0].element.left is users.c.name 690 # a deannotate needs to clone the base, in case 691 # the original one referenced annotated elements. 692 assert User.x.property.columns[0].element.right is not expr.right 693 694 assert User.y.property.columns[0] is not expr2 695 assert User.y.property.columns[0].element.\ 696 _raw_columns[0] is users.c.name 697 assert User.y.property.columns[0].element.\ 698 _raw_columns[1] is users.c.id 699 700 def test_synonym_replaces_backref(self): 701 addresses, users, User = (self.tables.addresses, 702 self.tables.users, 703 self.classes.User) 704 705 assert_calls = [] 706 707 class Address(object): 708 709 def _get_user(self): 710 assert_calls.append("get") 711 return self._user 712 713 def _set_user(self, user): 714 assert_calls.append("set") 715 self._user = user 716 user = property(_get_user, _set_user) 717 718 # synonym is created against nonexistent prop 719 mapper(Address, addresses, properties={ 720 'user': synonym('_user') 721 }) 722 sa.orm.configure_mappers() 723 724 # later, backref sets up the prop 725 mapper(User, users, properties={ 726 'addresses': relationship(Address, backref='_user') 727 }) 728 729 sess = create_session() 730 u1 = sess.query(User).get(7) 731 u2 = sess.query(User).get(8) 732 # comparaison ops need to work 733 a1 = sess.query(Address).filter(Address.user == u1).one() 734 eq_(a1.id, 1) 735 a1.user = u2 736 assert a1.user is u2 737 eq_(assert_calls, ["set", "get"]) 738 739 def test_self_ref_synonym(self): 740 t = Table('nodes', MetaData(), 741 Column( 742 'id', Integer, primary_key=True, 743 test_needs_autoincrement=True), 744 Column('parent_id', Integer, ForeignKey('nodes.id'))) 745 746 class Node(object): 747 pass 748 749 mapper(Node, t, properties={ 750 '_children': relationship( 751 Node, backref=backref('_parent', remote_side=t.c.id)), 752 'children': synonym('_children'), 753 'parent': synonym('_parent') 754 }) 755 756 n1 = Node() 757 n2 = Node() 758 n1.children.append(n2) 759 assert n2.parent is n2._parent is n1 760 assert n1.children[0] is n1._children[0] is n2 761 eq_(str(Node.parent == n2), ":param_1 = nodes.parent_id") 762 763 def test_non_primary_identity_class(self): 764 User = self.classes.User 765 users, addresses = self.tables.users, self.tables.addresses 766 767 class AddressUser(User): 768 pass 769 m1 = mapper(User, users, polymorphic_identity='user') 770 m2 = mapper(AddressUser, addresses, inherits=User, 771 polymorphic_identity='address', properties={ 772 'address_id': addresses.c.id 773 }) 774 m3 = mapper(AddressUser, addresses, non_primary=True) 775 assert m3._identity_class is m2._identity_class 776 eq_( 777 m2.identity_key_from_instance(AddressUser()), 778 m3.identity_key_from_instance(AddressUser()) 779 ) 780 781 def test_reassign_polymorphic_identity_warns(self): 782 User = self.classes.User 783 users = self.tables.users 784 785 class MyUser(User): 786 pass 787 m1 = mapper(User, users, polymorphic_on=users.c.name, 788 polymorphic_identity='user') 789 assert_raises_message( 790 sa.exc.SAWarning, 791 "Reassigning polymorphic association for identity 'user'", 792 mapper, 793 MyUser, users, inherits=User, polymorphic_identity='user' 794 ) 795 796 def test_illegal_non_primary(self): 797 users, Address, addresses, User = (self.tables.users, 798 self.classes.Address, 799 self.tables.addresses, 800 self.classes.User) 801 802 mapper(User, users) 803 mapper(Address, addresses) 804 mapper(User, users, non_primary=True, properties={ 805 'addresses': relationship(Address) 806 }) 807 assert_raises_message( 808 sa.exc.ArgumentError, 809 "Attempting to assign a new relationship 'addresses' " 810 "to a non-primary mapper on class 'User'", 811 configure_mappers 812 ) 813 814 def test_illegal_non_primary_2(self): 815 User, users = self.classes.User, self.tables.users 816 817 assert_raises_message( 818 sa.exc.InvalidRequestError, 819 "Configure a primary mapper first", 820 mapper, User, users, non_primary=True) 821 822 def test_illegal_non_primary_3(self): 823 users, addresses = self.tables.users, self.tables.addresses 824 825 class Base(object): 826 pass 827 828 class Sub(Base): 829 pass 830 mapper(Base, users) 831 assert_raises_message(sa.exc.InvalidRequestError, 832 "Configure a primary mapper first", 833 mapper, Sub, addresses, non_primary=True 834 ) 835 836 def test_prop_filters(self): 837 t = Table('person', MetaData(), 838 Column('id', Integer, primary_key=True, 839 test_needs_autoincrement=True), 840 Column('type', String(128)), 841 Column('name', String(128)), 842 Column('employee_number', Integer), 843 Column('boss_id', Integer, ForeignKey('person.id')), 844 Column('vendor_id', Integer)) 845 846 class Person(object): 847 pass 848 849 class Vendor(Person): 850 pass 851 852 class Employee(Person): 853 pass 854 855 class Manager(Employee): 856 pass 857 858 class Hoho(object): 859 pass 860 861 class Lala(object): 862 pass 863 864 class Fub(object): 865 pass 866 867 class Frob(object): 868 pass 869 870 class HasDef(object): 871 872 def name(self): 873 pass 874 875 class Empty(object): 876 pass 877 878 mapper( 879 Empty, t, properties={'empty_id': t.c.id}, 880 include_properties=[]) 881 p_m = mapper(Person, t, polymorphic_on=t.c.type, 882 include_properties=('id', 'type', 'name')) 883 e_m = mapper(Employee, inherits=p_m, 884 polymorphic_identity='employee', 885 properties={ 886 'boss': relationship( 887 Manager, backref=backref('peon'), 888 remote_side=t.c.id)}, 889 exclude_properties=('vendor_id', )) 890 891 mapper( 892 Manager, inherits=e_m, polymorphic_identity='manager', 893 include_properties=('id', 'type')) 894 895 mapper( 896 Vendor, inherits=p_m, polymorphic_identity='vendor', 897 exclude_properties=('boss_id', 'employee_number')) 898 mapper(Hoho, t, include_properties=('id', 'type', 'name')) 899 mapper( 900 Lala, t, exclude_properties=('vendor_id', 'boss_id'), 901 column_prefix="p_") 902 903 mapper(HasDef, t, column_prefix="h_") 904 905 mapper(Fub, t, include_properties=(t.c.id, t.c.type)) 906 mapper( 907 Frob, t, column_prefix='f_', 908 exclude_properties=( 909 t.c.boss_id, 910 'employee_number', t.c.vendor_id)) 911 912 configure_mappers() 913 914 def assert_props(cls, want): 915 have = set([n for n in dir(cls) if not n.startswith('_')]) 916 want = set(want) 917 eq_(have, want) 918 919 def assert_instrumented(cls, want): 920 have = set([p.key for p in class_mapper(cls).iterate_properties]) 921 want = set(want) 922 eq_(have, want) 923 924 assert_props(HasDef, ['h_boss_id', 'h_employee_number', 'h_id', 925 'name', 'h_name', 'h_vendor_id', 'h_type']) 926 assert_props(Person, ['id', 'name', 'type']) 927 assert_instrumented(Person, ['id', 'name', 'type']) 928 assert_props(Employee, ['boss', 'boss_id', 'employee_number', 929 'id', 'name', 'type']) 930 assert_instrumented(Employee, ['boss', 'boss_id', 'employee_number', 931 'id', 'name', 'type']) 932 assert_props(Manager, ['boss', 'boss_id', 'employee_number', 'peon', 933 'id', 'name', 'type']) 934 935 # 'peon' and 'type' are both explicitly stated properties 936 assert_instrumented(Manager, ['peon', 'type', 'id']) 937 938 assert_props(Vendor, ['vendor_id', 'id', 'name', 'type']) 939 assert_props(Hoho, ['id', 'name', 'type']) 940 assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type']) 941 assert_props(Fub, ['id', 'type']) 942 assert_props(Frob, ['f_id', 'f_type', 'f_name', ]) 943 944 # putting the discriminator column in exclude_properties, 945 # very weird. As of 0.7.4 this re-maps it. 946 class Foo(Person): 947 pass 948 assert_props(Empty, ['empty_id']) 949 950 mapper( 951 Foo, inherits=Person, polymorphic_identity='foo', 952 exclude_properties=('type', ), 953 ) 954 assert hasattr(Foo, "type") 955 assert Foo.type.property.columns[0] is t.c.type 956 957 @testing.provide_metadata 958 def test_prop_filters_defaults(self): 959 metadata = self.metadata 960 t = Table('t', metadata, 961 Column( 962 'id', Integer(), primary_key=True, 963 test_needs_autoincrement=True), 964 Column('x', Integer(), nullable=False, server_default='0') 965 ) 966 t.create() 967 968 class A(object): 969 pass 970 mapper(A, t, include_properties=['id']) 971 s = Session() 972 s.add(A()) 973 s.commit() 974 975 def test_we_dont_call_bool(self): 976 class NoBoolAllowed(object): 977 978 def __bool__(self): 979 raise Exception("nope") 980 mapper(NoBoolAllowed, self.tables.users) 981 u1 = NoBoolAllowed() 982 u1.name = "some name" 983 s = Session(testing.db) 984 s.add(u1) 985 s.commit() 986 assert s.query(NoBoolAllowed).get(u1.id) is u1 987 988 def test_we_dont_call_eq(self): 989 class NoEqAllowed(object): 990 991 def __eq__(self, other): 992 raise Exception("nope") 993 994 addresses, users = self.tables.addresses, self.tables.users 995 Address = self.classes.Address 996 997 mapper(NoEqAllowed, users, properties={ 998 'addresses': relationship(Address, backref='user') 999 }) 1000 mapper(Address, addresses) 1001 1002 u1 = NoEqAllowed() 1003 u1.name = "some name" 1004 u1.addresses = [Address(id=12, email_address='a1')] 1005 s = Session(testing.db) 1006 s.add(u1) 1007 s.commit() 1008 1009 a1 = s.query(Address).filter_by(id=12).one() 1010 assert a1.user is u1 1011 1012 def test_mapping_to_join_raises(self): 1013 """Test implicit merging of two cols raises.""" 1014 1015 addresses, users, User = (self.tables.addresses, 1016 self.tables.users, 1017 self.classes.User) 1018 1019 usersaddresses = sa.join(users, addresses, 1020 users.c.id == addresses.c.user_id) 1021 assert_raises_message( 1022 sa.exc.InvalidRequestError, 1023 "Implicitly", 1024 mapper, User, usersaddresses, primary_key=[users.c.id] 1025 ) 1026 1027 def test_mapping_to_join_explicit_prop(self): 1028 """Mapping to a join""" 1029 1030 User, addresses, users = (self.classes.User, 1031 self.tables.addresses, 1032 self.tables.users) 1033 1034 usersaddresses = sa.join(users, addresses, users.c.id 1035 == addresses.c.user_id) 1036 mapper(User, usersaddresses, primary_key=[users.c.id], 1037 properties={'add_id': addresses.c.id} 1038 ) 1039 result = create_session().query(User).order_by(users.c.id).all() 1040 eq_(result, self.static.user_result[:3]) 1041 1042 def test_mapping_to_join_exclude_prop(self): 1043 """Mapping to a join""" 1044 1045 User, addresses, users = (self.classes.User, 1046 self.tables.addresses, 1047 self.tables.users) 1048 1049 usersaddresses = sa.join(users, addresses, users.c.id 1050 == addresses.c.user_id) 1051 mapper(User, usersaddresses, primary_key=[users.c.id], 1052 exclude_properties=[addresses.c.id] 1053 ) 1054 result = create_session().query(User).order_by(users.c.id).all() 1055 eq_(result, self.static.user_result[:3]) 1056 1057 def test_mapping_to_join_no_pk(self): 1058 email_bounces, addresses, Address = (self.tables.email_bounces, 1059 self.tables.addresses, 1060 self.classes.Address) 1061 1062 m = mapper(Address, 1063 addresses.join(email_bounces), 1064 properties={'id': [addresses.c.id, email_bounces.c.id]} 1065 ) 1066 configure_mappers() 1067 assert addresses in m._pks_by_table 1068 assert email_bounces not in m._pks_by_table 1069 1070 sess = create_session() 1071 a = Address(id=10, email_address='e1') 1072 sess.add(a) 1073 sess.flush() 1074 1075 eq_( 1076 select([func.count('*')]).select_from(addresses).scalar(), 6) 1077 eq_( 1078 select([func.count('*')]).select_from(email_bounces).scalar(), 5) 1079 1080 def test_mapping_to_outerjoin(self): 1081 """Mapping to an outer join with a nullable composite primary key.""" 1082 1083 users, addresses, User = (self.tables.users, 1084 self.tables.addresses, 1085 self.classes.User) 1086 1087 mapper(User, users.outerjoin(addresses), 1088 primary_key=[users.c.id, addresses.c.id], 1089 properties=dict( 1090 address_id=addresses.c.id)) 1091 1092 session = create_session() 1093 result = session.query(User).order_by(User.id, User.address_id).all() 1094 1095 eq_(result, [ 1096 User(id=7, address_id=1), 1097 User(id=8, address_id=2), 1098 User(id=8, address_id=3), 1099 User(id=8, address_id=4), 1100 User(id=9, address_id=5), 1101 User(id=10, address_id=None)]) 1102 1103 def test_mapping_to_outerjoin_no_partial_pks(self): 1104 """test the allow_partial_pks=False flag.""" 1105 1106 users, addresses, User = (self.tables.users, 1107 self.tables.addresses, 1108 self.classes.User) 1109 1110 mapper(User, users.outerjoin(addresses), 1111 allow_partial_pks=False, 1112 primary_key=[users.c.id, addresses.c.id], 1113 properties=dict( 1114 address_id=addresses.c.id)) 1115 1116 session = create_session() 1117 result = session.query(User).order_by(User.id, User.address_id).all() 1118 1119 eq_(result, [ 1120 User(id=7, address_id=1), 1121 User(id=8, address_id=2), 1122 User(id=8, address_id=3), 1123 User(id=8, address_id=4), 1124 User(id=9, address_id=5), 1125 None]) 1126 1127 def test_scalar_pk_arg(self): 1128 users, Keyword, items, Item, User, keywords = (self.tables.users, 1129 self.classes.Keyword, 1130 self.tables.items, 1131 self.classes.Item, 1132 self.classes.User, 1133 self.tables.keywords) 1134 1135 m1 = mapper(Item, items, primary_key=[items.c.id]) 1136 m2 = mapper(Keyword, keywords, primary_key=keywords.c.id) 1137 m3 = mapper(User, users, primary_key=(users.c.id,)) 1138 1139 assert m1.primary_key[0] is items.c.id 1140 assert m2.primary_key[0] is keywords.c.id 1141 assert m3.primary_key[0] is users.c.id 1142 1143 def test_custom_join(self): 1144 """select_from totally replace the FROM parameters.""" 1145 1146 users, items, order_items, orders, Item, User, Order = ( 1147 self.tables.users, 1148 self.tables.items, 1149 self.tables.order_items, 1150 self.tables.orders, 1151 self.classes.Item, 1152 self.classes.User, 1153 self.classes.Order) 1154 1155 mapper(Item, items) 1156 1157 mapper(Order, orders, properties=dict( 1158 items=relationship(Item, order_items))) 1159 1160 mapper(User, users, properties=dict( 1161 orders=relationship(Order))) 1162 1163 session = create_session() 1164 result = (session.query(User). 1165 select_from(users.join(orders). 1166 join(order_items). 1167 join(items)). 1168 filter(items.c.description == 'item 4')).all() 1169 1170 eq_(result, [self.static.user_result[0]]) 1171 1172 @testing.uses_deprecated("Mapper.order_by") 1173 def test_cancel_order_by(self): 1174 users, User = self.tables.users, self.classes.User 1175 1176 mapper(User, users, order_by=users.c.name.desc()) 1177 1178 assert "order by users.name desc" in \ 1179 str(create_session().query(User).statement).lower() 1180 assert "order by" not in \ 1181 str(create_session().query(User).order_by(None).statement).lower() 1182 assert "order by users.name asc" in \ 1183 str(create_session().query(User).order_by( 1184 User.name.asc()).statement).lower() 1185 1186 eq_( 1187 create_session().query(User).all(), 1188 [User(id=7, name='jack'), User(id=9, name='fred'), 1189 User(id=8, name='ed'), User(id=10, name='chuck')] 1190 ) 1191 1192 eq_( 1193 create_session().query(User).order_by(User.name).all(), 1194 [User(id=10, name='chuck'), User(id=8, name='ed'), 1195 User(id=9, name='fred'), User(id=7, name='jack')] 1196 ) 1197 1198 # 'Raises a "expression evaluation not supported" error at prepare time 1199 @testing.fails_on('firebird', 'FIXME: unknown') 1200 def test_function(self): 1201 """Mapping to a SELECT statement that has functions in it.""" 1202 1203 addresses, users, User = (self.tables.addresses, 1204 self.tables.users, 1205 self.classes.User) 1206 1207 s = sa.select([users, 1208 (users.c.id * 2).label('concat'), 1209 sa.func.count(addresses.c.id).label('count')], 1210 users.c.id == addresses.c.user_id, 1211 group_by=[c for c in users.c]).alias('myselect') 1212 1213 mapper(User, s) 1214 sess = create_session() 1215 result = sess.query(User).order_by(s.c.id).all() 1216 1217 for idx, total in enumerate((14, 16)): 1218 eq_(result[idx].concat, result[idx].id * 2) 1219 eq_(result[idx].concat, total) 1220 1221 def test_count(self): 1222 """The count function on Query.""" 1223 1224 User, users = self.classes.User, self.tables.users 1225 1226 mapper(User, users) 1227 1228 session = create_session() 1229 q = session.query(User) 1230 1231 eq_(q.count(), 4) 1232 eq_(q.filter(User.id.in_([8, 9])).count(), 2) 1233 eq_(q.filter(users.c.id.in_([8, 9])).count(), 2) 1234 1235 eq_(session.query(User.id).count(), 4) 1236 eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2) 1237 1238 def test_many_to_many_count(self): 1239 keywords, items, item_keywords, Keyword, Item = ( 1240 self.tables.keywords, 1241 self.tables.items, 1242 self.tables.item_keywords, 1243 self.classes.Keyword, 1244 self.classes.Item) 1245 1246 mapper(Keyword, keywords) 1247 mapper(Item, items, properties=dict( 1248 keywords=relationship(Keyword, item_keywords, lazy='select'))) 1249 1250 session = create_session() 1251 q = (session.query(Item). 1252 join('keywords'). 1253 distinct(). 1254 filter(Keyword.name == "red")) 1255 eq_(q.count(), 2) 1256 1257 def test_override_1(self): 1258 """Overriding a column raises an error.""" 1259 1260 Address, addresses, users, User = (self.classes.Address, 1261 self.tables.addresses, 1262 self.tables.users, 1263 self.classes.User) 1264 1265 def go(): 1266 mapper(User, users, 1267 properties=dict( 1268 name=relationship(mapper(Address, addresses)))) 1269 1270 assert_raises(sa.exc.ArgumentError, go) 1271 1272 def test_override_2(self): 1273 """exclude_properties cancels the error.""" 1274 1275 Address, addresses, users, User = (self.classes.Address, 1276 self.tables.addresses, 1277 self.tables.users, 1278 self.classes.User) 1279 1280 mapper(User, users, 1281 exclude_properties=['name'], 1282 properties=dict( 1283 name=relationship(mapper(Address, addresses)))) 1284 1285 assert bool(User.name) 1286 1287 def test_override_3(self): 1288 """The column being named elsewhere also cancels the error,""" 1289 1290 Address, addresses, users, User = (self.classes.Address, 1291 self.tables.addresses, 1292 self.tables.users, 1293 self.classes.User) 1294 1295 mapper(User, users, 1296 properties=dict( 1297 name=relationship(mapper(Address, addresses)), 1298 foo=users.c.name)) 1299 1300 def test_synonym(self): 1301 users, addresses, Address = (self.tables.users, 1302 self.tables.addresses, 1303 self.classes.Address) 1304 1305 assert_col = [] 1306 1307 class extendedproperty(property): 1308 attribute = 123 1309 1310 class User(object): 1311 1312 def _get_name(self): 1313 assert_col.append(('get', self.name)) 1314 return self.name 1315 1316 def _set_name(self, name): 1317 assert_col.append(('set', name)) 1318 self.name = name 1319 uname = extendedproperty(_get_name, _set_name) 1320 1321 mapper(User, users, properties=dict( 1322 addresses=relationship(mapper(Address, addresses), lazy='select'), 1323 uname=synonym('name'), 1324 adlist=synonym('addresses'), 1325 adname=synonym('addresses') 1326 )) 1327 1328 # ensure the synonym can get at the proxied comparators without 1329 # an explicit compile 1330 User.name == 'ed' 1331 User.adname.any() 1332 1333 assert hasattr(User, 'adlist') 1334 # as of 0.4.2, synonyms always create a property 1335 assert hasattr(User, 'adname') 1336 1337 # test compile 1338 assert not isinstance(User.uname == 'jack', bool) 1339 1340 assert User.uname.property 1341 assert User.adlist.property 1342 1343 sess = create_session() 1344 1345 # test RowTuple names 1346 row = sess.query(User.id, User.uname).first() 1347 assert row.uname == row[1] 1348 1349 u = sess.query(User).filter(User.uname == 'jack').one() 1350 1351 fixture = self.static.user_address_result[0].addresses 1352 eq_(u.adlist, fixture) 1353 1354 addr = sess.query(Address).filter_by(id=fixture[0].id).one() 1355 u = sess.query(User).filter(User.adname.contains(addr)).one() 1356 u2 = sess.query(User).filter(User.adlist.contains(addr)).one() 1357 1358 assert u is u2 1359 1360 assert u not in sess.dirty 1361 u.uname = "some user name" 1362 assert len(assert_col) > 0 1363 eq_(assert_col, [('set', 'some user name')]) 1364 eq_(u.uname, "some user name") 1365 eq_(assert_col, [('set', 'some user name'), ('get', 'some user name')]) 1366 eq_(u.name, "some user name") 1367 assert u in sess.dirty 1368 1369 eq_(User.uname.attribute, 123) 1370 1371 def test_synonym_of_synonym(self): 1372 users, User = (self.tables.users, 1373 self.classes.User) 1374 1375 mapper(User, users, properties={ 1376 'x': synonym('id'), 1377 'y': synonym('x') 1378 }) 1379 1380 s = Session() 1381 u = s.query(User).filter(User.y == 8).one() 1382 eq_(u.y, 8) 1383 1384 def test_synonym_of_non_property_raises(self): 1385 from sqlalchemy.ext.associationproxy import association_proxy 1386 1387 class User(object): 1388 pass 1389 1390 users, Address, addresses = ( 1391 self.tables.users, 1392 self.classes.Address, 1393 self.tables.addresses) 1394 1395 mapper(User, users, properties={ 1396 'y': synonym('x'), 1397 'addresses': relationship(Address) 1398 }) 1399 mapper(Address, addresses) 1400 User.x = association_proxy("addresses", "email_address") 1401 1402 assert_raises_message( 1403 sa.exc.InvalidRequestError, 1404 r'synonym\(\) attribute "User.x" only supports ORM mapped ' 1405 'attributes, got .*AssociationProxy', 1406 getattr, User.y, "property" 1407 ) 1408 1409 def test_synonym_column_location(self): 1410 users, User = self.tables.users, self.classes.User 1411 1412 def go(): 1413 mapper(User, users, properties={ 1414 'not_name': synonym('_name', map_column=True)}) 1415 1416 assert_raises_message( 1417 sa.exc.ArgumentError, 1418 ("Can't compile synonym '_name': no column on table " 1419 "'users' named 'not_name'"), 1420 go) 1421 1422 def test_column_synonyms(self): 1423 """Synonyms which automatically instrument properties, 1424 set up aliased column, etc.""" 1425 1426 addresses, users, Address = (self.tables.addresses, 1427 self.tables.users, 1428 self.classes.Address) 1429 1430 assert_col = [] 1431 1432 class User(object): 1433 1434 def _get_name(self): 1435 assert_col.append(('get', self._name)) 1436 return self._name 1437 1438 def _set_name(self, name): 1439 assert_col.append(('set', name)) 1440 self._name = name 1441 name = property(_get_name, _set_name) 1442 1443 mapper(Address, addresses) 1444 mapper(User, users, properties={ 1445 'addresses': relationship(Address, lazy='select'), 1446 'name': synonym('_name', map_column=True) 1447 }) 1448 1449 # test compile 1450 assert not isinstance(User.name == 'jack', bool) 1451 1452 assert hasattr(User, 'name') 1453 assert hasattr(User, '_name') 1454 1455 sess = create_session() 1456 u = sess.query(User).filter(User.name == 'jack').one() 1457 eq_(u.name, 'jack') 1458 u.name = 'foo' 1459 eq_(u.name, 'foo') 1460 eq_(assert_col, [('get', 'jack'), ('set', 'foo'), ('get', 'foo')]) 1461 1462 def test_synonym_map_column_conflict(self): 1463 users, User = self.tables.users, self.classes.User 1464 1465 assert_raises( 1466 sa.exc.ArgumentError, 1467 mapper, 1468 User, users, properties=util.OrderedDict([ 1469 ('_user_id', users.c.id), 1470 ('id', synonym('_user_id', map_column=True)), 1471 ]) 1472 ) 1473 1474 assert_raises( 1475 sa.exc.ArgumentError, 1476 mapper, 1477 User, users, properties=util.OrderedDict([ 1478 ('id', synonym('_user_id', map_column=True)), 1479 ('_user_id', users.c.id), 1480 ]) 1481 ) 1482 1483 def test_comparable(self): 1484 users = self.tables.users 1485 1486 class extendedproperty(property): 1487 attribute = 123 1488 1489 def method1(self): 1490 return "method1" 1491 1492 from sqlalchemy.orm.properties import ColumnProperty 1493 1494 class UCComparator(ColumnProperty.Comparator): 1495 __hash__ = None 1496 1497 def method1(self): 1498 return "uccmethod1" 1499 1500 def method2(self, other): 1501 return "method2" 1502 1503 def __eq__(self, other): 1504 cls = self.prop.parent.class_ 1505 col = getattr(cls, 'name') 1506 if other is None: 1507 return col is None 1508 else: 1509 return sa.func.upper(col) == sa.func.upper(other) 1510 1511 def map_(with_explicit_property): 1512 class User(object): 1513 1514 @extendedproperty 1515 def uc_name(self): 1516 if self.name is None: 1517 return None 1518 return self.name.upper() 1519 if with_explicit_property: 1520 args = (UCComparator, User.uc_name) 1521 else: 1522 args = (UCComparator,) 1523 mapper(User, users, properties=dict( 1524 uc_name=sa.orm.comparable_property(*args))) 1525 return User 1526 1527 for User in (map_(True), map_(False)): 1528 sess = create_session() 1529 sess.begin() 1530 q = sess.query(User) 1531 1532 assert hasattr(User, 'name') 1533 assert hasattr(User, 'uc_name') 1534 1535 eq_(User.uc_name.method1(), "method1") 1536 eq_(User.uc_name.method2('x'), "method2") 1537 1538 assert_raises_message( 1539 AttributeError, 1540 "Neither 'extendedproperty' object nor 'UCComparator' " 1541 "object associated with User.uc_name has an attribute " 1542 "'nonexistent'", 1543 getattr, User.uc_name, 'nonexistent') 1544 1545 # test compile 1546 assert not isinstance(User.uc_name == 'jack', bool) 1547 u = q.filter(User.uc_name == 'JACK').one() 1548 1549 assert u.uc_name == "JACK" 1550 assert u not in sess.dirty 1551 1552 u.name = "some user name" 1553 eq_(u.name, "some user name") 1554 assert u in sess.dirty 1555 eq_(u.uc_name, "SOME USER NAME") 1556 1557 sess.flush() 1558 sess.expunge_all() 1559 1560 q = sess.query(User) 1561 u2 = q.filter(User.name == 'some user name').one() 1562 u3 = q.filter(User.uc_name == 'SOME USER NAME').one() 1563 1564 assert u2 is u3 1565 1566 eq_(User.uc_name.attribute, 123) 1567 sess.rollback() 1568 1569 def test_comparable_column(self): 1570 users, User = self.tables.users, self.classes.User 1571 1572 class MyComparator(sa.orm.properties.ColumnProperty.Comparator): 1573 __hash__ = None 1574 1575 def __eq__(self, other): 1576 # lower case comparison 1577 return func.lower(self.__clause_element__() 1578 ) == func.lower(other) 1579 1580 def intersects(self, other): 1581 # non-standard comparator 1582 return self.__clause_element__().op('&=')(other) 1583 1584 mapper(User, users, properties={ 1585 'name': sa.orm.column_property(users.c.name, 1586 comparator_factory=MyComparator) 1587 }) 1588 1589 assert_raises_message( 1590 AttributeError, 1591 "Neither 'InstrumentedAttribute' object nor " 1592 "'MyComparator' object associated with User.name has " 1593 "an attribute 'nonexistent'", 1594 getattr, User.name, "nonexistent") 1595 1596 eq_( 1597 str((User.name == 'ed').compile( 1598 dialect=sa.engine.default.DefaultDialect())), 1599 "lower(users.name) = lower(:lower_1)") 1600 eq_( 1601 str((User.name.intersects('ed')).compile( 1602 dialect=sa.engine.default.DefaultDialect())), 1603 "users.name &= :name_1") 1604 1605 def test_reentrant_compile(self): 1606 users, Address, addresses, User = (self.tables.users, 1607 self.classes.Address, 1608 self.tables.addresses, 1609 self.classes.User) 1610 1611 class MyFakeProperty(sa.orm.properties.ColumnProperty): 1612 1613 def post_instrument_class(self, mapper): 1614 super(MyFakeProperty, self).post_instrument_class(mapper) 1615 configure_mappers() 1616 1617 m1 = mapper(User, users, properties={ 1618 'name': MyFakeProperty(users.c.name) 1619 }) 1620 m2 = mapper(Address, addresses) 1621 configure_mappers() 1622 1623 sa.orm.clear_mappers() 1624 1625 class MyFakeProperty(sa.orm.properties.ColumnProperty): 1626 1627 def post_instrument_class(self, mapper): 1628 super(MyFakeProperty, self).post_instrument_class(mapper) 1629 configure_mappers() 1630 1631 m1 = mapper(User, users, properties={ 1632 'name': MyFakeProperty(users.c.name) 1633 }) 1634 m2 = mapper(Address, addresses) 1635 configure_mappers() 1636 1637 def test_reconstructor(self): 1638 users = self.tables.users 1639 1640 recon = [] 1641 1642 class User(object): 1643 1644 @reconstructor 1645 def reconstruct(self): 1646 recon.append('go') 1647 1648 mapper(User, users) 1649 1650 User() 1651 eq_(recon, []) 1652 create_session().query(User).first() 1653 eq_(recon, ['go']) 1654 1655 def test_reconstructor_inheritance(self): 1656 users = self.tables.users 1657 1658 recon = [] 1659 1660 class A(object): 1661 1662 @reconstructor 1663 def reconstruct(self): 1664 assert isinstance(self, A) 1665 recon.append('A') 1666 1667 class B(A): 1668 1669 @reconstructor 1670 def reconstruct(self): 1671 assert isinstance(self, B) 1672 recon.append('B') 1673 1674 class C(A): 1675 1676 @reconstructor 1677 def reconstruct(self): 1678 assert isinstance(self, C) 1679 recon.append('C') 1680 1681 mapper(A, users, polymorphic_on=users.c.name, 1682 polymorphic_identity='jack') 1683 mapper(B, inherits=A, polymorphic_identity='ed') 1684 mapper(C, inherits=A, polymorphic_identity='chuck') 1685 1686 A() 1687 B() 1688 C() 1689 eq_(recon, []) 1690 1691 sess = create_session() 1692 sess.query(A).first() 1693 sess.query(B).first() 1694 sess.query(C).first() 1695 eq_(recon, ['A', 'B', 'C']) 1696 1697 def test_unmapped_reconstructor_inheritance(self): 1698 users = self.tables.users 1699 1700 recon = [] 1701 1702 class Base(object): 1703 1704 @reconstructor 1705 def reconstruct(self): 1706 recon.append('go') 1707 1708 class User(Base): 1709 pass 1710 1711 mapper(User, users) 1712 1713 User() 1714 eq_(recon, []) 1715 1716 create_session().query(User).first() 1717 eq_(recon, ['go']) 1718 1719 def test_unmapped_error(self): 1720 Address, addresses, users, User = (self.classes.Address, 1721 self.tables.addresses, 1722 self.tables.users, 1723 self.classes.User) 1724 1725 mapper(Address, addresses) 1726 sa.orm.clear_mappers() 1727 1728 mapper(User, users, properties={ 1729 'addresses': relationship(Address) 1730 }) 1731 1732 assert_raises_message( 1733 sa.orm.exc.UnmappedClassError, 1734 "Class 'test.orm._fixtures.Address' is not mapped", 1735 sa.orm.configure_mappers) 1736 1737 def test_unmapped_not_type_error(self): 1738 assert_raises_message( 1739 sa.exc.ArgumentError, 1740 "Class object expected, got '5'.", 1741 class_mapper, 5 1742 ) 1743 1744 def test_unmapped_not_type_error_iter_ok(self): 1745 assert_raises_message( 1746 sa.exc.ArgumentError, 1747 r"Class object expected, got '\(5, 6\)'.", 1748 class_mapper, (5, 6) 1749 ) 1750 1751 def test_attribute_error_raised_class_mapper(self): 1752 users = self.tables.users 1753 addresses = self.tables.addresses 1754 User = self.classes.User 1755 Address = self.classes.Address 1756 1757 mapper(User, users, properties={ 1758 "addresses": relationship( 1759 Address, 1760 primaryjoin=lambda: users.c.id == addresses.wrong.user_id) 1761 }) 1762 mapper(Address, addresses) 1763 assert_raises_message( 1764 AttributeError, 1765 "'Table' object has no attribute 'wrong'", 1766 class_mapper, Address 1767 ) 1768 1769 def test_key_error_raised_class_mapper(self): 1770 users = self.tables.users 1771 addresses = self.tables.addresses 1772 User = self.classes.User 1773 Address = self.classes.Address 1774 1775 mapper(User, users, properties={ 1776 "addresses": relationship(Address, 1777 primaryjoin=lambda: users.c.id == 1778 addresses.__dict__['wrong'].user_id) 1779 }) 1780 mapper(Address, addresses) 1781 assert_raises_message( 1782 KeyError, 1783 "wrong", 1784 class_mapper, Address 1785 ) 1786 1787 def test_unmapped_subclass_error_postmap(self): 1788 users = self.tables.users 1789 1790 class Base(object): 1791 pass 1792 1793 class Sub(Base): 1794 pass 1795 1796 mapper(Base, users) 1797 sa.orm.configure_mappers() 1798 1799 # we can create new instances, set attributes. 1800 s = Sub() 1801 s.name = 'foo' 1802 eq_(s.name, 'foo') 1803 eq_( 1804 attributes.get_history(s, 'name'), 1805 (['foo'], (), ()) 1806 ) 1807 1808 # using it with an ORM operation, raises 1809 assert_raises(sa.orm.exc.UnmappedClassError, 1810 create_session().add, Sub()) 1811 1812 def test_unmapped_subclass_error_premap(self): 1813 users = self.tables.users 1814 1815 class Base(object): 1816 pass 1817 1818 mapper(Base, users) 1819 1820 class Sub(Base): 1821 pass 1822 1823 sa.orm.configure_mappers() 1824 1825 # we can create new instances, set attributes. 1826 s = Sub() 1827 s.name = 'foo' 1828 eq_(s.name, 'foo') 1829 eq_( 1830 attributes.get_history(s, 'name'), 1831 (['foo'], (), ()) 1832 ) 1833 1834 # using it with an ORM operation, raises 1835 assert_raises(sa.orm.exc.UnmappedClassError, 1836 create_session().add, Sub()) 1837 1838 def test_oldstyle_mixin(self): 1839 users = self.tables.users 1840 1841 class OldStyle: 1842 pass 1843 1844 class NewStyle(object): 1845 pass 1846 1847 class A(NewStyle, OldStyle): 1848 pass 1849 1850 mapper(A, users) 1851 1852 class B(OldStyle, NewStyle): 1853 pass 1854 1855 mapper(B, users) 1856 1857 1858class DocumentTest(fixtures.TestBase): 1859 1860 def test_doc_propagate(self): 1861 metadata = MetaData() 1862 t1 = Table('t1', metadata, 1863 Column('col1', Integer, primary_key=True, 1864 doc="primary key column"), 1865 Column('col2', String, doc="data col"), 1866 Column('col3', String, doc="data col 2"), 1867 Column('col4', String, doc="data col 3"), 1868 Column('col5', String), 1869 ) 1870 t2 = Table('t2', metadata, 1871 Column('col1', Integer, primary_key=True, 1872 doc="primary key column"), 1873 Column('col2', String, doc="data col"), 1874 Column('col3', Integer, ForeignKey('t1.col1'), 1875 doc="foreign key to t1.col1") 1876 ) 1877 1878 class Foo(object): 1879 pass 1880 1881 class Bar(object): 1882 pass 1883 1884 mapper(Foo, t1, properties={ 1885 'bars': relationship(Bar, 1886 doc="bar relationship", 1887 backref=backref('foo', doc='foo relationship') 1888 ), 1889 'foober': column_property(t1.c.col3, doc='alternate data col'), 1890 'hoho': synonym("col4", doc="syn of col4") 1891 }) 1892 mapper(Bar, t2) 1893 configure_mappers() 1894 eq_(Foo.col1.__doc__, "primary key column") 1895 eq_(Foo.col2.__doc__, "data col") 1896 eq_(Foo.col5.__doc__, None) 1897 eq_(Foo.foober.__doc__, "alternate data col") 1898 eq_(Foo.bars.__doc__, "bar relationship") 1899 eq_(Foo.hoho.__doc__, "syn of col4") 1900 eq_(Bar.col1.__doc__, "primary key column") 1901 eq_(Bar.foo.__doc__, "foo relationship") 1902 1903 1904class ORMLoggingTest(_fixtures.FixtureTest): 1905 1906 def setup(self): 1907 self.buf = logging.handlers.BufferingHandler(100) 1908 for log in [ 1909 logging.getLogger('sqlalchemy.orm'), 1910 ]: 1911 log.addHandler(self.buf) 1912 1913 def teardown(self): 1914 for log in [ 1915 logging.getLogger('sqlalchemy.orm'), 1916 ]: 1917 log.removeHandler(self.buf) 1918 1919 def _current_messages(self): 1920 return [b.getMessage() for b in self.buf.buffer] 1921 1922 def test_mapper_info_aliased(self): 1923 User, users = self.classes.User, self.tables.users 1924 tb = users.select().alias() 1925 mapper(User, tb) 1926 s = Session() 1927 s.add(User(name='ed')) 1928 s.commit() 1929 1930 for msg in self._current_messages(): 1931 assert msg.startswith('(User|%%(%d anon)s) ' % id(tb)) 1932 1933 1934class OptionsTest(_fixtures.FixtureTest): 1935 1936 def test_synonym_options(self): 1937 Address, addresses, users, User = (self.classes.Address, 1938 self.tables.addresses, 1939 self.tables.users, 1940 self.classes.User) 1941 1942 mapper(User, users, properties=dict( 1943 addresses=relationship(mapper(Address, addresses), lazy='select', 1944 order_by=addresses.c.id), 1945 adlist=synonym('addresses'))) 1946 1947 def go(): 1948 sess = create_session() 1949 u = (sess.query(User). 1950 order_by(User.id). 1951 options(sa.orm.joinedload('adlist')). 1952 filter_by(name='jack')).one() 1953 eq_(u.adlist, 1954 [self.static.user_address_result[0].addresses[0]]) 1955 self.assert_sql_count(testing.db, go, 1) 1956 1957 def test_eager_options(self): 1958 """A lazy relationship can be upgraded to an eager relationship.""" 1959 1960 Address, addresses, users, User = (self.classes.Address, 1961 self.tables.addresses, 1962 self.tables.users, 1963 self.classes.User) 1964 1965 mapper(User, users, properties=dict( 1966 addresses=relationship(mapper(Address, addresses), 1967 order_by=addresses.c.id))) 1968 1969 sess = create_session() 1970 result = (sess.query(User). 1971 order_by(User.id). 1972 options(sa.orm.joinedload('addresses'))).all() 1973 1974 def go(): 1975 eq_(result, self.static.user_address_result) 1976 self.sql_count_(0, go) 1977 1978 def test_eager_options_with_limit(self): 1979 Address, addresses, users, User = (self.classes.Address, 1980 self.tables.addresses, 1981 self.tables.users, 1982 self.classes.User) 1983 1984 mapper(User, users, properties=dict( 1985 addresses=relationship(mapper(Address, addresses), lazy='select'))) 1986 1987 sess = create_session() 1988 u = (sess.query(User). 1989 options(sa.orm.joinedload('addresses')). 1990 filter_by(id=8)).one() 1991 1992 def go(): 1993 eq_(u.id, 8) 1994 eq_(len(u.addresses), 3) 1995 self.sql_count_(0, go) 1996 1997 sess.expunge_all() 1998 1999 u = sess.query(User).filter_by(id=8).one() 2000 eq_(u.id, 8) 2001 eq_(len(u.addresses), 3) 2002 2003 def test_lazy_options_with_limit(self): 2004 Address, addresses, users, User = (self.classes.Address, 2005 self.tables.addresses, 2006 self.tables.users, 2007 self.classes.User) 2008 2009 mapper(User, users, properties=dict( 2010 addresses=relationship(mapper(Address, addresses), lazy='joined'))) 2011 2012 sess = create_session() 2013 u = (sess.query(User). 2014 options(sa.orm.lazyload('addresses')). 2015 filter_by(id=8)).one() 2016 2017 def go(): 2018 eq_(u.id, 8) 2019 eq_(len(u.addresses), 3) 2020 self.sql_count_(1, go) 2021 2022 def test_eager_degrade(self): 2023 """An eager relationship automatically degrades to a lazy relationship 2024 if eager columns are not available""" 2025 2026 Address, addresses, users, User = (self.classes.Address, 2027 self.tables.addresses, 2028 self.tables.users, 2029 self.classes.User) 2030 2031 mapper(User, users, properties=dict( 2032 addresses=relationship(mapper(Address, addresses), 2033 lazy='joined', order_by=addresses.c.id))) 2034 2035 sess = create_session() 2036 # first test straight eager load, 1 statement 2037 2038 def go(): 2039 result = sess.query(User).order_by(User.id).all() 2040 eq_(result, self.static.user_address_result) 2041 self.sql_count_(1, go) 2042 2043 sess.expunge_all() 2044 2045 # then select just from users. run it into instances. 2046 # then assert the data, which will launch 3 more lazy loads 2047 # (previous users in session fell out of scope and were removed from 2048 # session's identity map) 2049 r = users.select().order_by(users.c.id).execute() 2050 2051 def go(): 2052 result = list(sess.query(User).instances(r)) 2053 eq_(result, self.static.user_address_result) 2054 self.sql_count_(4, go) 2055 2056 def test_eager_degrade_deep(self): 2057 users, Keyword, items, order_items, orders, \ 2058 Item, User, Address, keywords, item_keywords, Order, addresses = ( 2059 self.tables.users, 2060 self.classes.Keyword, 2061 self.tables.items, 2062 self.tables.order_items, 2063 self.tables.orders, 2064 self.classes.Item, 2065 self.classes.User, 2066 self.classes.Address, 2067 self.tables.keywords, 2068 self.tables.item_keywords, 2069 self.classes.Order, 2070 self.tables.addresses) 2071 2072 # test with a deeper set of eager loads. when we first load the three 2073 # users, they will have no addresses or orders. the number of lazy 2074 # loads when traversing the whole thing will be three for the 2075 # addresses and three for the orders. 2076 mapper(Address, addresses) 2077 2078 mapper(Keyword, keywords) 2079 2080 mapper(Item, items, properties=dict( 2081 keywords=relationship(Keyword, secondary=item_keywords, 2082 lazy='joined', 2083 order_by=item_keywords.c.keyword_id))) 2084 2085 mapper(Order, orders, properties=dict( 2086 items=relationship(Item, secondary=order_items, lazy='joined', 2087 order_by=order_items.c.item_id))) 2088 2089 mapper(User, users, properties=dict( 2090 addresses=relationship(Address, lazy='joined', 2091 order_by=addresses.c.id), 2092 orders=relationship(Order, lazy='joined', 2093 order_by=orders.c.id))) 2094 2095 sess = create_session() 2096 2097 # first test straight eager load, 1 statement 2098 def go(): 2099 result = sess.query(User).order_by(User.id).all() 2100 eq_(result, self.static.user_all_result) 2101 self.assert_sql_count(testing.db, go, 1) 2102 2103 sess.expunge_all() 2104 2105 # then select just from users. run it into instances. 2106 # then assert the data, which will launch 6 more lazy loads 2107 r = users.select().execute() 2108 2109 def go(): 2110 result = list(sess.query(User).instances(r)) 2111 eq_(result, self.static.user_all_result) 2112 self.assert_sql_count(testing.db, go, 6) 2113 2114 def test_lazy_options(self): 2115 """An eager relationship can be upgraded to a lazy relationship.""" 2116 2117 Address, addresses, users, User = (self.classes.Address, 2118 self.tables.addresses, 2119 self.tables.users, 2120 self.classes.User) 2121 2122 mapper(User, users, properties=dict( 2123 addresses=relationship(mapper(Address, addresses), lazy='joined') 2124 )) 2125 2126 sess = create_session() 2127 result = (sess.query(User). 2128 order_by(User.id). 2129 options(sa.orm.lazyload('addresses'))).all() 2130 2131 def go(): 2132 eq_(result, self.static.user_address_result) 2133 self.sql_count_(4, go) 2134 2135 def test_option_propagate(self): 2136 users, items, order_items, Order, Item, User, orders = ( 2137 self.tables.users, 2138 self.tables.items, 2139 self.tables.order_items, 2140 self.classes.Order, 2141 self.classes.Item, 2142 self.classes.User, 2143 self.tables.orders) 2144 2145 mapper(User, users, properties=dict( 2146 orders=relationship(Order) 2147 )) 2148 mapper(Order, orders, properties=dict( 2149 items=relationship(Item, secondary=order_items) 2150 )) 2151 mapper(Item, items) 2152 2153 sess = create_session() 2154 2155 oalias = aliased(Order) 2156 opt1 = sa.orm.joinedload(User.orders, Order.items) 2157 opt2 = sa.orm.contains_eager(User.orders, Order.items, alias=oalias) 2158 u1 = sess.query(User).join(oalias, User.orders).\ 2159 options(opt1, opt2).first() 2160 ustate = attributes.instance_state(u1) 2161 assert opt1 in ustate.load_options 2162 assert opt2 not in ustate.load_options 2163 2164 2165class DeepOptionsTest(_fixtures.FixtureTest): 2166 2167 @classmethod 2168 def setup_mappers(cls): 2169 users, Keyword, items, order_items, Order, Item, User, \ 2170 keywords, item_keywords, orders = ( 2171 cls.tables.users, 2172 cls.classes.Keyword, 2173 cls.tables.items, 2174 cls.tables.order_items, 2175 cls.classes.Order, 2176 cls.classes.Item, 2177 cls.classes.User, 2178 cls.tables.keywords, 2179 cls.tables.item_keywords, 2180 cls.tables.orders) 2181 2182 mapper(Keyword, keywords) 2183 2184 mapper(Item, items, properties=dict( 2185 keywords=relationship(Keyword, item_keywords, 2186 order_by=item_keywords.c.item_id))) 2187 2188 mapper(Order, orders, properties=dict( 2189 items=relationship(Item, order_items, 2190 order_by=items.c.id))) 2191 2192 mapper(User, users, properties=dict( 2193 orders=relationship(Order, order_by=orders.c.id))) 2194 2195 def test_deep_options_1(self): 2196 User = self.classes.User 2197 2198 sess = create_session() 2199 2200 # joinedload nothing. 2201 u = sess.query(User).order_by(User.id).all() 2202 2203 def go(): 2204 u[0].orders[1].items[0].keywords[1] 2205 self.assert_sql_count(testing.db, go, 3) 2206 2207 def test_deep_options_2(self): 2208 """test (joined|subquery)load_all() options""" 2209 2210 User = self.classes.User 2211 2212 sess = create_session() 2213 2214 result = (sess.query(User). 2215 order_by(User.id). 2216 options( 2217 sa.orm.joinedload_all('orders.items.keywords'))).all() 2218 2219 def go(): 2220 result[0].orders[1].items[0].keywords[1] 2221 self.sql_count_(0, go) 2222 2223 sess = create_session() 2224 2225 result = (sess.query(User). 2226 options( 2227 sa.orm.subqueryload_all('orders.items.keywords'))).all() 2228 2229 def go(): 2230 result[0].orders[1].items[0].keywords[1] 2231 self.sql_count_(0, go) 2232 2233 def test_deep_options_3(self): 2234 User = self.classes.User 2235 2236 sess = create_session() 2237 2238 # same thing, with separate options calls 2239 q2 = (sess.query(User). 2240 order_by(User.id). 2241 options(sa.orm.joinedload('orders')). 2242 options(sa.orm.joinedload('orders.items')). 2243 options(sa.orm.joinedload('orders.items.keywords'))) 2244 u = q2.all() 2245 2246 def go(): 2247 u[0].orders[1].items[0].keywords[1] 2248 self.sql_count_(0, go) 2249 2250 def test_deep_options_4(self): 2251 Item, User, Order = (self.classes.Item, 2252 self.classes.User, 2253 self.classes.Order) 2254 2255 sess = create_session() 2256 2257 assert_raises_message( 2258 sa.exc.ArgumentError, 2259 "Can't find property 'items' on any entity " 2260 "specified in this Query.", 2261 sess.query(User).options, sa.orm.joinedload(Order.items)) 2262 2263 # joinedload "keywords" on items. it will lazy load "orders", then 2264 # lazy load the "items" on the order, but on "items" it will eager 2265 # load the "keywords" 2266 q3 = sess.query(User).order_by(User.id).options( 2267 sa.orm.joinedload('orders.items.keywords')) 2268 u = q3.all() 2269 2270 def go(): 2271 u[0].orders[1].items[0].keywords[1] 2272 self.sql_count_(2, go) 2273 2274 sess = create_session() 2275 q3 = sess.query(User).order_by(User.id).options( 2276 sa.orm.joinedload(User.orders, Order.items, Item.keywords)) 2277 u = q3.all() 2278 2279 def go(): 2280 u[0].orders[1].items[0].keywords[1] 2281 self.sql_count_(2, go) 2282 2283 2284class ComparatorFactoryTest(_fixtures.FixtureTest, AssertsCompiledSQL): 2285 2286 def test_kwarg_accepted(self): 2287 users, Address = self.tables.users, self.classes.Address 2288 2289 class DummyComposite(object): 2290 2291 def __init__(self, x, y): 2292 pass 2293 2294 from sqlalchemy.orm.interfaces import PropComparator 2295 2296 class MyFactory(PropComparator): 2297 pass 2298 2299 for args in ( 2300 (column_property, users.c.name), 2301 (deferred, users.c.name), 2302 (synonym, 'name'), 2303 (composite, DummyComposite, users.c.id, users.c.name), 2304 (relationship, Address), 2305 (backref, 'address'), 2306 (comparable_property, ), 2307 (dynamic_loader, Address) 2308 ): 2309 fn = args[0] 2310 args = args[1:] 2311 fn(comparator_factory=MyFactory, *args) 2312 2313 def test_column(self): 2314 User, users = self.classes.User, self.tables.users 2315 2316 from sqlalchemy.orm.properties import ColumnProperty 2317 2318 class MyFactory(ColumnProperty.Comparator): 2319 __hash__ = None 2320 2321 def __eq__(self, other): 2322 return func.foobar(self.__clause_element__()) == \ 2323 func.foobar(other) 2324 mapper( 2325 User, users, 2326 properties={ 2327 'name': column_property( 2328 users.c.name, comparator_factory=MyFactory)}) 2329 self.assert_compile( 2330 User.name == 'ed', 2331 "foobar(users.name) = foobar(:foobar_1)", 2332 dialect=default.DefaultDialect() 2333 ) 2334 self.assert_compile( 2335 aliased(User).name == 'ed', 2336 "foobar(users_1.name) = foobar(:foobar_1)", 2337 dialect=default.DefaultDialect()) 2338 2339 def test_synonym(self): 2340 users, User = self.tables.users, self.classes.User 2341 2342 from sqlalchemy.orm.properties import ColumnProperty 2343 2344 class MyFactory(ColumnProperty.Comparator): 2345 __hash__ = None 2346 2347 def __eq__(self, other): 2348 return func.foobar(self.__clause_element__()) ==\ 2349 func.foobar(other) 2350 2351 mapper(User, users, properties={ 2352 'name': synonym('_name', map_column=True, 2353 comparator_factory=MyFactory) 2354 }) 2355 self.assert_compile( 2356 User.name == 'ed', 2357 "foobar(users.name) = foobar(:foobar_1)", 2358 dialect=default.DefaultDialect()) 2359 2360 self.assert_compile( 2361 aliased(User).name == 'ed', 2362 "foobar(users_1.name) = foobar(:foobar_1)", 2363 dialect=default.DefaultDialect()) 2364 2365 def test_relationship(self): 2366 users, Address, addresses, User = (self.tables.users, 2367 self.classes.Address, 2368 self.tables.addresses, 2369 self.classes.User) 2370 2371 from sqlalchemy.orm.properties import RelationshipProperty 2372 2373 # NOTE: this API changed in 0.8, previously __clause_element__() 2374 # gave the parent selecatable, now it gives the 2375 # primaryjoin/secondaryjoin 2376 class MyFactory(RelationshipProperty.Comparator): 2377 __hash__ = None 2378 2379 def __eq__(self, other): 2380 return func.foobar(self._source_selectable().c.user_id) == \ 2381 func.foobar(other.id) 2382 2383 class MyFactory2(RelationshipProperty.Comparator): 2384 __hash__ = None 2385 2386 def __eq__(self, other): 2387 return func.foobar(self._source_selectable().c.id) == \ 2388 func.foobar(other.user_id) 2389 2390 mapper(User, users) 2391 mapper(Address, addresses, properties={ 2392 'user': relationship( 2393 User, comparator_factory=MyFactory, 2394 backref=backref("addresses", comparator_factory=MyFactory2) 2395 ) 2396 } 2397 ) 2398 2399 # these are kind of nonsensical tests. 2400 self.assert_compile(Address.user == User(id=5), 2401 "foobar(addresses.user_id) = foobar(:foobar_1)", 2402 dialect=default.DefaultDialect()) 2403 self.assert_compile(User.addresses == Address(id=5, user_id=7), 2404 "foobar(users.id) = foobar(:foobar_1)", 2405 dialect=default.DefaultDialect()) 2406 2407 self.assert_compile( 2408 aliased(Address).user == User(id=5), 2409 "foobar(addresses_1.user_id) = foobar(:foobar_1)", 2410 dialect=default.DefaultDialect()) 2411 2412 self.assert_compile( 2413 aliased(User).addresses == Address(id=5, user_id=7), 2414 "foobar(users_1.id) = foobar(:foobar_1)", 2415 dialect=default.DefaultDialect()) 2416 2417 2418class SecondaryOptionsTest(fixtures.MappedTest): 2419 2420 """test that the contains_eager() option doesn't bleed 2421 into a secondary load.""" 2422 2423 run_inserts = 'once' 2424 2425 run_deletes = None 2426 2427 @classmethod 2428 def define_tables(cls, metadata): 2429 Table("base", metadata, 2430 Column('id', Integer, primary_key=True), 2431 Column('type', String(50), nullable=False) 2432 ) 2433 Table("child1", metadata, 2434 Column('id', Integer, ForeignKey('base.id'), primary_key=True), 2435 Column( 2436 'child2id', Integer, ForeignKey('child2.id'), nullable=False) 2437 ) 2438 Table("child2", metadata, 2439 Column('id', Integer, ForeignKey('base.id'), primary_key=True), 2440 ) 2441 Table('related', metadata, 2442 Column('id', Integer, ForeignKey('base.id'), primary_key=True), 2443 ) 2444 2445 @classmethod 2446 def setup_mappers(cls): 2447 child1, child2, base, related = (cls.tables.child1, 2448 cls.tables.child2, 2449 cls.tables.base, 2450 cls.tables.related) 2451 2452 class Base(cls.Comparable): 2453 pass 2454 2455 class Child1(Base): 2456 pass 2457 2458 class Child2(Base): 2459 pass 2460 2461 class Related(cls.Comparable): 2462 pass 2463 mapper(Base, base, polymorphic_on=base.c.type, properties={ 2464 'related': relationship(Related, uselist=False) 2465 }) 2466 mapper(Child1, child1, inherits=Base, 2467 polymorphic_identity='child1', 2468 properties={ 2469 'child2': relationship( 2470 Child2, 2471 primaryjoin=child1.c.child2id == base.c.id, 2472 foreign_keys=child1.c.child2id) 2473 }) 2474 mapper(Child2, child2, inherits=Base, polymorphic_identity='child2') 2475 mapper(Related, related) 2476 2477 @classmethod 2478 def insert_data(cls): 2479 child1, child2, base, related = (cls.tables.child1, 2480 cls.tables.child2, 2481 cls.tables.base, 2482 cls.tables.related) 2483 2484 base.insert().execute([ 2485 {'id': 1, 'type': 'child1'}, 2486 {'id': 2, 'type': 'child1'}, 2487 {'id': 3, 'type': 'child1'}, 2488 {'id': 4, 'type': 'child2'}, 2489 {'id': 5, 'type': 'child2'}, 2490 {'id': 6, 'type': 'child2'}, 2491 ]) 2492 child2.insert().execute([ 2493 {'id': 4}, 2494 {'id': 5}, 2495 {'id': 6}, 2496 ]) 2497 child1.insert().execute([ 2498 {'id': 1, 'child2id': 4}, 2499 {'id': 2, 'child2id': 5}, 2500 {'id': 3, 'child2id': 6}, 2501 ]) 2502 related.insert().execute([ 2503 {'id': 1}, 2504 {'id': 2}, 2505 {'id': 3}, 2506 {'id': 4}, 2507 {'id': 5}, 2508 {'id': 6}, 2509 ]) 2510 2511 def test_contains_eager(self): 2512 Child1, Related = self.classes.Child1, self.classes.Related 2513 2514 sess = create_session() 2515 2516 child1s = sess.query(Child1).\ 2517 join(Child1.related).\ 2518 options(sa.orm.contains_eager(Child1.related)).\ 2519 order_by(Child1.id) 2520 2521 def go(): 2522 eq_( 2523 child1s.all(), 2524 [ 2525 Child1(id=1, related=Related(id=1)), 2526 Child1(id=2, related=Related(id=2)), 2527 Child1(id=3, related=Related(id=3)) 2528 ] 2529 ) 2530 self.assert_sql_count(testing.db, go, 1) 2531 2532 c1 = child1s[0] 2533 2534 self.assert_sql_execution( 2535 testing.db, 2536 lambda: c1.child2, 2537 CompiledSQL( 2538 "SELECT child2.id AS child2_id, base.id AS base_id, " 2539 "base.type AS base_type " 2540 "FROM base JOIN child2 ON base.id = child2.id " 2541 "WHERE base.id = :param_1", 2542 {'param_1': 4} 2543 ) 2544 ) 2545 2546 def test_joinedload_on_other(self): 2547 Child1, Related = self.classes.Child1, self.classes.Related 2548 2549 sess = create_session() 2550 2551 child1s = sess.query(Child1).join(Child1.related).options( 2552 sa.orm.joinedload(Child1.related)).order_by(Child1.id) 2553 2554 def go(): 2555 eq_( 2556 child1s.all(), 2557 [Child1(id=1, related=Related(id=1)), 2558 Child1(id=2, related=Related(id=2)), 2559 Child1(id=3, related=Related(id=3))] 2560 ) 2561 self.assert_sql_count(testing.db, go, 1) 2562 2563 c1 = child1s[0] 2564 2565 self.assert_sql_execution( 2566 testing.db, 2567 lambda: c1.child2, 2568 CompiledSQL( 2569 "SELECT child2.id AS child2_id, base.id AS base_id, " 2570 "base.type AS base_type " 2571 "FROM base JOIN child2 ON base.id = child2.id " 2572 "WHERE base.id = :param_1", 2573 2574 {'param_1': 4} 2575 ) 2576 ) 2577 2578 def test_joinedload_on_same(self): 2579 Child1, Child2, Related = (self.classes.Child1, 2580 self.classes.Child2, 2581 self.classes.Related) 2582 2583 sess = create_session() 2584 2585 child1s = sess.query(Child1).join(Child1.related).options( 2586 sa.orm.joinedload(Child1.child2, Child2.related) 2587 ).order_by(Child1.id) 2588 2589 def go(): 2590 eq_( 2591 child1s.all(), 2592 [Child1(id=1, related=Related(id=1)), 2593 Child1(id=2, related=Related(id=2)), 2594 Child1(id=3, related=Related(id=3))] 2595 ) 2596 self.assert_sql_count(testing.db, go, 4) 2597 2598 c1 = child1s[0] 2599 2600 # this *does* joinedload 2601 self.assert_sql_execution( 2602 testing.db, 2603 lambda: c1.child2, 2604 CompiledSQL( 2605 "SELECT child2.id AS child2_id, base.id AS base_id, " 2606 "base.type AS base_type, " 2607 "related_1.id AS related_1_id FROM base JOIN child2 " 2608 "ON base.id = child2.id " 2609 "LEFT OUTER JOIN related AS related_1 " 2610 "ON base.id = related_1.id WHERE base.id = :param_1", 2611 {'param_1': 4} 2612 ) 2613 ) 2614 2615 2616class DeferredPopulationTest(fixtures.MappedTest): 2617 2618 @classmethod 2619 def define_tables(cls, metadata): 2620 Table("thing", metadata, 2621 Column( 2622 "id", Integer, primary_key=True, 2623 test_needs_autoincrement=True), 2624 Column("name", String(20))) 2625 2626 Table("human", metadata, 2627 Column( 2628 "id", Integer, primary_key=True, 2629 test_needs_autoincrement=True), 2630 Column("thing_id", Integer, ForeignKey("thing.id")), 2631 Column("name", String(20))) 2632 2633 @classmethod 2634 def setup_mappers(cls): 2635 thing, human = cls.tables.thing, cls.tables.human 2636 2637 class Human(cls.Basic): 2638 pass 2639 2640 class Thing(cls.Basic): 2641 pass 2642 2643 mapper(Human, human, properties={"thing": relationship(Thing)}) 2644 mapper(Thing, thing, properties={"name": deferred(thing.c.name)}) 2645 2646 @classmethod 2647 def insert_data(cls): 2648 thing, human = cls.tables.thing, cls.tables.human 2649 2650 thing.insert().execute([ 2651 {"id": 1, "name": "Chair"}, 2652 ]) 2653 2654 human.insert().execute([ 2655 {"id": 1, "thing_id": 1, "name": "Clark Kent"}, 2656 ]) 2657 2658 def _test(self, thing): 2659 assert "name" in attributes.instance_state(thing).dict 2660 2661 def test_no_previous_query(self): 2662 Thing = self.classes.Thing 2663 2664 session = create_session() 2665 thing = session.query(Thing).options(sa.orm.undefer("name")).first() 2666 self._test(thing) 2667 2668 def test_query_twice_with_clear(self): 2669 Thing = self.classes.Thing 2670 2671 session = create_session() 2672 result = session.query(Thing).first() # noqa 2673 session.expunge_all() 2674 thing = session.query(Thing).options(sa.orm.undefer("name")).first() 2675 self._test(thing) 2676 2677 def test_query_twice_no_clear(self): 2678 Thing = self.classes.Thing 2679 2680 session = create_session() 2681 result = session.query(Thing).first() # noqa 2682 thing = session.query(Thing).options(sa.orm.undefer("name")).first() 2683 self._test(thing) 2684 2685 def test_joinedload_with_clear(self): 2686 Thing, Human = self.classes.Thing, self.classes.Human 2687 2688 session = create_session() 2689 human = session.query(Human).options( # noqa 2690 sa.orm.joinedload("thing")).first() 2691 session.expunge_all() 2692 thing = session.query(Thing).options(sa.orm.undefer("name")).first() 2693 self._test(thing) 2694 2695 def test_joinedload_no_clear(self): 2696 Thing, Human = self.classes.Thing, self.classes.Human 2697 2698 session = create_session() 2699 human = session.query(Human).options( # noqa 2700 sa.orm.joinedload("thing")).first() 2701 thing = session.query(Thing).options(sa.orm.undefer("name")).first() 2702 self._test(thing) 2703 2704 def test_join_with_clear(self): 2705 Thing, Human = self.classes.Thing, self.classes.Human 2706 2707 session = create_session() 2708 result = session.query(Human).add_entity( # noqa 2709 Thing).join("thing").first() 2710 session.expunge_all() 2711 thing = session.query(Thing).options(sa.orm.undefer("name")).first() 2712 self._test(thing) 2713 2714 def test_join_no_clear(self): 2715 Thing, Human = self.classes.Thing, self.classes.Human 2716 2717 session = create_session() 2718 result = session.query(Human).add_entity( # noqa 2719 Thing).join("thing").first() 2720 thing = session.query(Thing).options(sa.orm.undefer("name")).first() 2721 self._test(thing) 2722 2723 2724class NoLoadTest(_fixtures.FixtureTest): 2725 run_inserts = 'once' 2726 run_deletes = None 2727 2728 def test_o2m_noload(self): 2729 2730 Address, addresses, users, User = ( 2731 self.classes.Address, 2732 self.tables.addresses, 2733 self.tables.users, 2734 self.classes.User) 2735 2736 m = mapper(User, users, properties=dict( 2737 addresses=relationship(mapper(Address, addresses), lazy='noload') 2738 )) 2739 q = create_session().query(m) 2740 result = [None] 2741 2742 def go(): 2743 x = q.filter(User.id == 7).all() 2744 x[0].addresses 2745 result[0] = x 2746 self.assert_sql_count(testing.db, go, 1) 2747 2748 self.assert_result( 2749 result[0], User, 2750 {'id': 7, 'addresses': (Address, [])}, 2751 ) 2752 2753 def test_upgrade_o2m_noload_lazyload_option(self): 2754 Address, addresses, users, User = ( 2755 self.classes.Address, 2756 self.tables.addresses, 2757 self.tables.users, 2758 self.classes.User) 2759 2760 m = mapper(User, users, properties=dict( 2761 addresses=relationship(mapper(Address, addresses), lazy='noload') 2762 )) 2763 q = create_session().query(m).options(sa.orm.lazyload('addresses')) 2764 result = [None] 2765 2766 def go(): 2767 x = q.filter(User.id == 7).all() 2768 x[0].addresses 2769 result[0] = x 2770 self.sql_count_(2, go) 2771 2772 self.assert_result( 2773 result[0], User, 2774 {'id': 7, 'addresses': (Address, [{'id': 1}])}, 2775 ) 2776 2777 def test_m2o_noload_option(self): 2778 Address, addresses, users, User = ( 2779 self.classes.Address, 2780 self.tables.addresses, 2781 self.tables.users, 2782 self.classes.User) 2783 mapper(Address, addresses, properties={ 2784 'user': relationship(User) 2785 }) 2786 mapper(User, users) 2787 s = Session() 2788 a1 = s.query(Address).filter_by(id=1).options( 2789 sa.orm.noload('user')).first() 2790 2791 def go(): 2792 eq_(a1.user, None) 2793 self.sql_count_(0, go) 2794 2795 2796class RaiseLoadTest(_fixtures.FixtureTest): 2797 run_inserts = 'once' 2798 run_deletes = None 2799 2800 def test_o2m_raiseload_mapper(self): 2801 Address, addresses, users, User = ( 2802 self.classes.Address, 2803 self.tables.addresses, 2804 self.tables.users, 2805 self.classes.User) 2806 2807 mapper(Address, addresses) 2808 mapper(User, users, properties=dict( 2809 addresses=relationship(Address, lazy='raise') 2810 )) 2811 q = create_session().query(User) 2812 result = [None] 2813 2814 def go(): 2815 x = q.filter(User.id == 7).all() 2816 assert_raises_message( 2817 sa.exc.InvalidRequestError, 2818 "'User.addresses' is not available due to lazy='raise'", 2819 lambda: x[0].addresses) 2820 result[0] = x 2821 self.assert_sql_count(testing.db, go, 1) 2822 2823 self.assert_result( 2824 result[0], User, 2825 {'id': 7}, 2826 ) 2827 2828 def test_o2m_raiseload_option(self): 2829 Address, addresses, users, User = ( 2830 self.classes.Address, 2831 self.tables.addresses, 2832 self.tables.users, 2833 self.classes.User) 2834 2835 mapper(Address, addresses) 2836 mapper(User, users, properties=dict( 2837 addresses=relationship(Address) 2838 )) 2839 q = create_session().query(User) 2840 result = [None] 2841 2842 def go(): 2843 x = q.options( 2844 sa.orm.raiseload(User.addresses)).filter(User.id == 7).all() 2845 assert_raises_message( 2846 sa.exc.InvalidRequestError, 2847 "'User.addresses' is not available due to lazy='raise'", 2848 lambda: x[0].addresses) 2849 result[0] = x 2850 self.assert_sql_count(testing.db, go, 1) 2851 2852 self.assert_result( 2853 result[0], User, 2854 {'id': 7}, 2855 ) 2856 2857 def test_o2m_raiseload_lazyload_option(self): 2858 Address, addresses, users, User = ( 2859 self.classes.Address, 2860 self.tables.addresses, 2861 self.tables.users, 2862 self.classes.User) 2863 2864 mapper(Address, addresses) 2865 mapper(User, users, properties=dict( 2866 addresses=relationship(Address, lazy='raise') 2867 )) 2868 q = create_session().query(User).options(sa.orm.lazyload('addresses')) 2869 result = [None] 2870 2871 def go(): 2872 x = q.filter(User.id == 7).all() 2873 x[0].addresses 2874 result[0] = x 2875 self.sql_count_(2, go) 2876 2877 self.assert_result( 2878 result[0], User, 2879 {'id': 7, 'addresses': (Address, [{'id': 1}])}, 2880 ) 2881 2882 def test_m2o_raiseload_option(self): 2883 Address, addresses, users, User = ( 2884 self.classes.Address, 2885 self.tables.addresses, 2886 self.tables.users, 2887 self.classes.User) 2888 mapper(Address, addresses, properties={ 2889 'user': relationship(User) 2890 }) 2891 mapper(User, users) 2892 s = Session() 2893 a1 = s.query(Address).filter_by(id=1).options( 2894 sa.orm.raiseload('user')).first() 2895 2896 def go(): 2897 assert_raises_message( 2898 sa.exc.InvalidRequestError, 2899 "'Address.user' is not available due to lazy='raise'", 2900 lambda: a1.user) 2901 2902 self.sql_count_(0, go) 2903 2904 def test_m2o_raise_on_sql_option(self): 2905 Address, addresses, users, User = ( 2906 self.classes.Address, 2907 self.tables.addresses, 2908 self.tables.users, 2909 self.classes.User) 2910 mapper(Address, addresses, properties={ 2911 'user': relationship(User) 2912 }) 2913 mapper(User, users) 2914 s = Session() 2915 a1 = s.query(Address).filter_by(id=1).options( 2916 sa.orm.raiseload('user', sql_only=True)).first() 2917 2918 def go(): 2919 assert_raises_message( 2920 sa.exc.InvalidRequestError, 2921 "'Address.user' is not available due to lazy='raise_on_sql'", 2922 lambda: a1.user) 2923 2924 self.sql_count_(0, go) 2925 2926 s.close() 2927 2928 u1 = s.query(User).first() 2929 a1 = s.query(Address).filter_by(id=1).options( 2930 sa.orm.raiseload('user', sql_only=True)).first() 2931 assert 'user' not in a1.__dict__ 2932 is_(a1.user, u1) 2933 2934 def test_m2o_non_use_get_raise_on_sql_option(self): 2935 Address, addresses, users, User = ( 2936 self.classes.Address, 2937 self.tables.addresses, 2938 self.tables.users, 2939 self.classes.User) 2940 mapper(Address, addresses, properties={ 2941 'user': relationship( 2942 User, 2943 primaryjoin=sa.and_( 2944 addresses.c.user_id == users.c.id, 2945 users.c.name != None # noqa 2946 ) 2947 ) 2948 }) 2949 mapper(User, users) 2950 s = Session() 2951 u1 = s.query(User).first() 2952 a1 = s.query(Address).filter_by(id=1).options( 2953 sa.orm.raiseload('user', sql_only=True)).first() 2954 2955 def go(): 2956 assert_raises_message( 2957 sa.exc.InvalidRequestError, 2958 "'Address.user' is not available due to lazy='raise_on_sql'", 2959 lambda: a1.user) 2960 2961 2962class RequirementsTest(fixtures.MappedTest): 2963 2964 """Tests the contract for user classes.""" 2965 2966 @classmethod 2967 def define_tables(cls, metadata): 2968 Table('ht1', metadata, 2969 Column( 2970 'id', Integer, primary_key=True, 2971 test_needs_autoincrement=True), 2972 Column('value', String(10))) 2973 Table('ht2', metadata, 2974 Column( 2975 'id', Integer, primary_key=True, 2976 test_needs_autoincrement=True), 2977 Column('ht1_id', Integer, ForeignKey('ht1.id')), 2978 Column('value', String(10))) 2979 Table('ht3', metadata, 2980 Column( 2981 'id', Integer, primary_key=True, 2982 test_needs_autoincrement=True), 2983 Column('value', String(10))) 2984 Table('ht4', metadata, 2985 Column('ht1_id', Integer, ForeignKey('ht1.id'), 2986 primary_key=True), 2987 Column('ht3_id', Integer, ForeignKey('ht3.id'), 2988 primary_key=True)) 2989 Table('ht5', metadata, 2990 Column('ht1_id', Integer, ForeignKey('ht1.id'), 2991 primary_key=True)) 2992 Table('ht6', metadata, 2993 Column('ht1a_id', Integer, ForeignKey('ht1.id'), 2994 primary_key=True), 2995 Column('ht1b_id', Integer, ForeignKey('ht1.id'), 2996 primary_key=True), 2997 Column('value', String(10))) 2998 2999 if util.py2k: 3000 def test_baseclass(self): 3001 ht1 = self.tables.ht1 3002 3003 class OldStyle: 3004 pass 3005 3006 assert_raises(sa.exc.ArgumentError, mapper, OldStyle, ht1) 3007 3008 assert_raises(sa.exc.ArgumentError, mapper, 123) 3009 3010 class NoWeakrefSupport(str): 3011 pass 3012 3013 # TODO: is weakref support detectable without an instance? 3014 # self.assertRaises( 3015 # sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2) 3016 3017 class _ValueBase(object): 3018 3019 def __init__(self, value='abc', id=None): 3020 self.id = id 3021 self.value = value 3022 3023 def __bool__(self): 3024 return False 3025 3026 def __hash__(self): 3027 return hash(self.value) 3028 3029 def __eq__(self, other): 3030 if isinstance(other, type(self)): 3031 return self.value == other.value 3032 return False 3033 3034 def test_comparison_overrides(self): 3035 """Simple tests to ensure users can supply comparison __methods__. 3036 3037 The suite-level test --options are better suited to detect 3038 problems- they add selected __methods__ across the board on all 3039 ORM tests. This test simply shoves a variety of operations 3040 through the ORM to catch basic regressions early in a standard 3041 test run. 3042 """ 3043 3044 ht6, ht5, ht4, ht3, ht2, ht1 = (self.tables.ht6, 3045 self.tables.ht5, 3046 self.tables.ht4, 3047 self.tables.ht3, 3048 self.tables.ht2, 3049 self.tables.ht1) 3050 3051 class H1(self._ValueBase): 3052 pass 3053 3054 class H2(self._ValueBase): 3055 pass 3056 3057 class H3(self._ValueBase): 3058 pass 3059 3060 class H6(self._ValueBase): 3061 pass 3062 3063 mapper(H1, ht1, properties={ 3064 'h2s': relationship(H2, backref='h1'), 3065 'h3s': relationship(H3, secondary=ht4, backref='h1s'), 3066 'h1s': relationship(H1, secondary=ht5, backref='parent_h1'), 3067 't6a': relationship(H6, backref='h1a', 3068 primaryjoin=ht1.c.id == ht6.c.ht1a_id), 3069 't6b': relationship(H6, backref='h1b', 3070 primaryjoin=ht1.c.id == ht6.c.ht1b_id), 3071 }) 3072 mapper(H2, ht2) 3073 mapper(H3, ht3) 3074 mapper(H6, ht6) 3075 3076 s = create_session() 3077 s.add_all([ 3078 H1('abc'), 3079 H1('def'), 3080 ]) 3081 h1 = H1('ghi') 3082 s.add(h1) 3083 h1.h2s.append(H2('abc')) 3084 h1.h3s.extend([H3(), H3()]) 3085 h1.h1s.append(H1()) 3086 3087 s.flush() 3088 eq_(select([func.count('*')]).select_from(ht1).scalar(), 4) 3089 3090 h6 = H6() 3091 h6.h1a = h1 3092 h6.h1b = h1 3093 3094 h6 = H6() 3095 h6.h1a = h1 3096 h6.h1b = x = H1() 3097 assert x in s 3098 3099 h6.h1b.h2s.append(H2('def')) 3100 3101 s.flush() 3102 3103 h1.h2s.extend([H2('abc'), H2('def')]) 3104 s.flush() 3105 3106 h1s = s.query(H1).options(sa.orm.joinedload('h2s')).all() 3107 eq_(len(h1s), 5) 3108 3109 self.assert_unordered_result(h1s, H1, 3110 {'h2s': []}, 3111 {'h2s': []}, 3112 {'h2s': (H2, [{'value': 'abc'}, 3113 {'value': 'def'}, 3114 {'value': 'abc'}])}, 3115 {'h2s': []}, 3116 {'h2s': (H2, [{'value': 'def'}])}) 3117 3118 h1s = s.query(H1).options(sa.orm.joinedload('h3s')).all() 3119 3120 eq_(len(h1s), 5) 3121 h1s = s.query(H1).options(sa.orm.joinedload_all('t6a.h1b'), 3122 sa.orm.joinedload('h2s'), 3123 sa.orm.joinedload_all('h3s.h1s')).all() 3124 eq_(len(h1s), 5) 3125 3126 def test_composite_results(self): 3127 ht2, ht1 = (self.tables.ht2, 3128 self.tables.ht1) 3129 3130 class H1(self._ValueBase): 3131 3132 def __init__(self, value, id, h2s): 3133 self.value = value 3134 self.id = id 3135 self.h2s = h2s 3136 3137 class H2(self._ValueBase): 3138 3139 def __init__(self, value, id): 3140 self.value = value 3141 self.id = id 3142 3143 mapper(H1, ht1, properties={ 3144 'h2s': relationship(H2, backref='h1'), 3145 }) 3146 mapper(H2, ht2) 3147 s = Session() 3148 s.add_all([ 3149 H1('abc', 1, h2s=[ 3150 H2('abc', id=1), 3151 H2('def', id=2), 3152 H2('def', id=3), 3153 ]), 3154 H1('def', 2, h2s=[ 3155 H2('abc', id=4), 3156 H2('abc', id=5), 3157 H2('def', id=6), 3158 ]), 3159 ]) 3160 s.commit() 3161 eq_( 3162 [(h1.value, h1.id, h2.value, h2.id) 3163 for h1, h2 in 3164 s.query(H1, H2).join(H1.h2s).order_by(H1.id, H2.id)], 3165 [ 3166 ('abc', 1, 'abc', 1), 3167 ('abc', 1, 'def', 2), 3168 ('abc', 1, 'def', 3), 3169 ('def', 2, 'abc', 4), 3170 ('def', 2, 'abc', 5), 3171 ('def', 2, 'def', 6), 3172 ] 3173 ) 3174 3175 def test_nonzero_len_recursion(self): 3176 ht1 = self.tables.ht1 3177 3178 class H1(object): 3179 3180 def __len__(self): 3181 return len(self.get_value()) 3182 3183 def get_value(self): 3184 self.value = "foobar" 3185 return self.value 3186 3187 class H2(object): 3188 3189 def __bool__(self): 3190 return bool(self.get_value()) 3191 3192 def get_value(self): 3193 self.value = "foobar" 3194 return self.value 3195 3196 mapper(H1, ht1) 3197 mapper(H2, ht1) 3198 3199 h1 = H1() 3200 h1.value = "Asdf" 3201 h1.value = "asdf asdf" # ding 3202 3203 h2 = H2() 3204 h2.value = "Asdf" 3205 h2.value = "asdf asdf" # ding 3206 3207 3208class IsUserlandTest(fixtures.MappedTest): 3209 3210 @classmethod 3211 def define_tables(cls, metadata): 3212 Table('foo', metadata, 3213 Column('id', Integer, primary_key=True), 3214 Column('someprop', Integer) 3215 ) 3216 3217 def _test(self, value, instancelevel=None): 3218 class Foo(object): 3219 someprop = value 3220 3221 m = mapper(Foo, self.tables.foo) 3222 eq_(Foo.someprop, value) 3223 f1 = Foo() 3224 if instancelevel is not None: 3225 eq_(f1.someprop, instancelevel) 3226 else: 3227 eq_(f1.someprop, value) 3228 assert self.tables.foo.c.someprop not in m._columntoproperty 3229 3230 def _test_not(self, value): 3231 class Foo(object): 3232 someprop = value 3233 3234 m = mapper(Foo, self.tables.foo) 3235 is_(Foo.someprop.property.columns[0], self.tables.foo.c.someprop) 3236 assert self.tables.foo.c.someprop in m._columntoproperty 3237 3238 def test_string(self): 3239 self._test("someprop") 3240 3241 def test_unicode(self): 3242 self._test("someprop") 3243 3244 def test_int(self): 3245 self._test(5) 3246 3247 def test_dict(self): 3248 self._test({"bar": "bat"}) 3249 3250 def test_set(self): 3251 self._test(set([6])) 3252 3253 def test_column(self): 3254 self._test_not(self.tables.foo.c.someprop) 3255 3256 def test_relationship(self): 3257 self._test_not(relationship("bar")) 3258 3259 def test_descriptor(self): 3260 def somefunc(self): 3261 return "hi" 3262 self._test(property(somefunc), "hi") 3263 3264 3265class MagicNamesTest(fixtures.MappedTest): 3266 3267 @classmethod 3268 def define_tables(cls, metadata): 3269 Table('cartographers', metadata, 3270 Column('id', Integer, primary_key=True, 3271 test_needs_autoincrement=True), 3272 Column('name', String(50)), 3273 Column('alias', String(50)), 3274 Column('quip', String(100))) 3275 Table('maps', metadata, 3276 Column('id', Integer, primary_key=True, 3277 test_needs_autoincrement=True), 3278 Column('cart_id', Integer, 3279 ForeignKey('cartographers.id')), 3280 Column('state', String(2)), 3281 Column('data', sa.Text)) 3282 3283 @classmethod 3284 def setup_classes(cls): 3285 class Cartographer(cls.Basic): 3286 pass 3287 3288 class Map(cls.Basic): 3289 pass 3290 3291 def test_mappish(self): 3292 maps, Cartographer, cartographers, Map = (self.tables.maps, 3293 self.classes.Cartographer, 3294 self.tables.cartographers, 3295 self.classes.Map) 3296 3297 mapper(Cartographer, cartographers, properties=dict( 3298 query=cartographers.c.quip)) 3299 mapper(Map, maps, properties=dict( 3300 mapper=relationship(Cartographer, backref='maps'))) 3301 3302 c = Cartographer(name='Lenny', alias='The Dude', 3303 query='Where be dragons?') 3304 Map(state='AK', mapper=c) 3305 3306 sess = create_session() 3307 sess.add(c) 3308 sess.flush() 3309 sess.expunge_all() 3310 3311 for C, M in ((Cartographer, Map), 3312 (sa.orm.aliased(Cartographer), sa.orm.aliased(Map))): 3313 c1 = (sess.query(C). 3314 filter(C.alias == 'The Dude'). 3315 filter(C.query == 'Where be dragons?')).one() 3316 sess.query(M).filter(M.mapper == c1).one() 3317 3318 def test_direct_stateish(self): 3319 for reserved in (sa.orm.instrumentation.ClassManager.STATE_ATTR, 3320 sa.orm.instrumentation.ClassManager.MANAGER_ATTR): 3321 t = Table('t', sa.MetaData(), 3322 Column('id', Integer, primary_key=True, 3323 test_needs_autoincrement=True), 3324 Column(reserved, Integer)) 3325 3326 class T(object): 3327 pass 3328 assert_raises_message( 3329 KeyError, 3330 ('%r: requested attribute name conflicts with ' 3331 'instrumentation attribute of the same name.' % reserved), 3332 mapper, T, t) 3333 3334 def test_indirect_stateish(self): 3335 maps = self.tables.maps 3336 3337 for reserved in (sa.orm.instrumentation.ClassManager.STATE_ATTR, 3338 sa.orm.instrumentation.ClassManager.MANAGER_ATTR): 3339 class M(object): 3340 pass 3341 3342 assert_raises_message( 3343 KeyError, 3344 ('requested attribute name conflicts with ' 3345 'instrumentation attribute of the same name'), 3346 mapper, M, maps, properties={ 3347 reserved: maps.c.state}) 3348