1"""General mapper operations with an emphasis on selecting/loading."""
2
3from sqlalchemy.testing import assert_raises, assert_raises_message
4import sqlalchemy as sa
5from sqlalchemy import testing
6from sqlalchemy import MetaData, Integer, String, \
7    ForeignKey, func, util, select
8from sqlalchemy.testing.schema import Table, Column
9from sqlalchemy.engine import default
10from sqlalchemy.orm import mapper, relationship, backref, \
11    create_session, class_mapper, configure_mappers, reconstructor, \
12    aliased, deferred, synonym, attributes, \
13    column_property, composite, dynamic_loader, \
14    comparable_property, Session
15from sqlalchemy.orm.persistence import _sort_states
16from sqlalchemy.testing import eq_, AssertsCompiledSQL, is_
17from sqlalchemy.testing import fixtures
18from test.orm import _fixtures
19from sqlalchemy.testing.assertsql import CompiledSQL
20import logging
21import logging.handlers
22
23
24class MapperTest(_fixtures.FixtureTest, AssertsCompiledSQL):
25    __dialect__ = 'default'
26
27    def test_prop_shadow(self):
28        """A backref name may not shadow an existing property name."""
29
30        Address, addresses, users, User = (self.classes.Address,
31                                           self.tables.addresses,
32                                           self.tables.users,
33                                           self.classes.User)
34
35        mapper(Address, addresses)
36        mapper(User, users,
37               properties={
38                   'addresses': relationship(Address, backref='email_address')
39               })
40        assert_raises(sa.exc.ArgumentError, sa.orm.configure_mappers)
41
42    def test_update_attr_keys(self):
43        """test that update()/insert() use the correct key when given
44        InstrumentedAttributes."""
45
46        User, users = self.classes.User, self.tables.users
47
48        mapper(User, users, properties={
49            'foobar': users.c.name
50        })
51
52        users.insert().values({User.foobar: 'name1'}).execute()
53        eq_(sa.select([User.foobar]).where(User.foobar == 'name1').
54            execute().fetchall(), [('name1',)])
55
56        users.update().values({User.foobar: User.foobar + 'foo'}).execute()
57        eq_(sa.select([User.foobar]).where(User.foobar == 'name1foo').
58            execute().fetchall(), [('name1foo',)])
59
60    def test_utils(self):
61        users = self.tables.users
62        addresses = self.tables.addresses
63        Address = self.classes.Address
64
65        from sqlalchemy.orm.base import _is_mapped_class, _is_aliased_class
66
67        class Foo(object):
68            x = "something"
69
70            @property
71            def y(self):
72                return "something else"
73
74        m = mapper(Foo, users, properties={"addresses": relationship(Address)})
75        mapper(Address, addresses)
76        a1 = aliased(Foo)
77
78        f = Foo()
79
80        for fn, arg, ret in [
81            (_is_mapped_class, Foo.x, False),
82            (_is_mapped_class, Foo.y, False),
83            (_is_mapped_class, Foo.name, False),
84            (_is_mapped_class, Foo.addresses, False),
85            (_is_mapped_class, Foo, True),
86            (_is_mapped_class, f, False),
87            (_is_mapped_class, a1, True),
88            (_is_mapped_class, m, True),
89            (_is_aliased_class, a1, True),
90            (_is_aliased_class, Foo.x, False),
91            (_is_aliased_class, Foo.y, False),
92            (_is_aliased_class, Foo, False),
93            (_is_aliased_class, f, False),
94            (_is_aliased_class, a1, True),
95            (_is_aliased_class, m, False),
96        ]:
97            assert fn(arg) == ret
98
99    def test_entity_descriptor(self):
100        users = self.tables.users
101
102        from sqlalchemy.orm.base import _entity_descriptor
103
104        class Foo(object):
105            x = "something"
106
107            @property
108            def y(self):
109                return "something else"
110        m = mapper(Foo, users)
111        a1 = aliased(Foo)
112
113        for arg, key, ret in [
114            (m, "x", Foo.x),
115            (Foo, "x", Foo.x),
116            (a1, "x", a1.x),
117            (users, "name", users.c.name)
118        ]:
119            assert _entity_descriptor(arg, key) is ret
120
121    def test_friendly_attribute_str_on_uncompiled_boom(self):
122        User, users = self.classes.User, self.tables.users
123
124        def boom():
125            raise Exception("it broke")
126        mapper(User, users, properties={
127            'addresses': relationship(boom)
128        })
129
130        # test that QueryableAttribute.__str__() doesn't
131        # cause a compile.
132        eq_(str(User.addresses), "User.addresses")
133
134    def test_exceptions_sticky(self):
135        """test preservation of mapper compile errors raised during hasattr(),
136        as well as for redundant mapper compile calls.  Test that
137        repeated calls don't stack up error messages.
138
139        """
140
141        Address, addresses, User = (self.classes.Address,
142                                    self.tables.addresses,
143                                    self.classes.User)
144
145        mapper(Address, addresses, properties={
146            'user': relationship(User)
147        })
148
149        try:
150            hasattr(Address.user, 'property')
151        except sa.orm.exc.UnmappedClassError:
152            assert util.compat.py32
153
154        for i in range(3):
155            assert_raises_message(sa.exc.InvalidRequestError,
156                                  "^One or more "
157                                  "mappers failed to initialize - can't "
158                                  "proceed with initialization of other "
159                                  r"mappers. Triggering mapper\: "
160                                  r"'Mapper\|Address\|addresses'."
161                                  " Original exception was: Class "
162                                  "'test.orm._fixtures.User' is not mapped$",
163                                  configure_mappers)
164
165    def test_column_prefix(self):
166        users, User = self.tables.users, self.classes.User
167
168        mapper(User, users, column_prefix='_', properties={
169            'user_name': synonym('_name')
170        })
171
172        s = create_session()
173        u = s.query(User).get(7)
174        eq_(u._name, 'jack')
175        eq_(u._id, 7)
176        u2 = s.query(User).filter_by(user_name='jack').one()
177        assert u is u2
178
179    def test_no_pks_1(self):
180        User, users = self.classes.User, self.tables.users
181
182        s = sa.select([users.c.name]).alias('foo')
183        assert_raises(sa.exc.ArgumentError, mapper, User, s)
184
185    def test_no_pks_2(self):
186        User, users = self.classes.User, self.tables.users
187
188        s = sa.select([users.c.name]).alias()
189        assert_raises(sa.exc.ArgumentError, mapper, User, s)
190
191    def test_reconfigure_on_other_mapper(self):
192        """A configure trigger on an already-configured mapper
193        still triggers a check against all mappers."""
194
195        users, Address, addresses, User = (self.tables.users,
196                                           self.classes.Address,
197                                           self.tables.addresses,
198                                           self.classes.User)
199
200        mapper(User, users)
201        sa.orm.configure_mappers()
202        assert sa.orm.mapperlib.Mapper._new_mappers is False
203
204        m = mapper(Address, addresses, properties={
205            'user': relationship(User, backref="addresses")})
206
207        assert m.configured is False
208        assert sa.orm.mapperlib.Mapper._new_mappers is True
209        u = User()
210        assert User.addresses
211        assert sa.orm.mapperlib.Mapper._new_mappers is False
212
213    def test_configure_on_session(self):
214        User, users = self.classes.User, self.tables.users
215
216        m = mapper(User, users)
217        session = create_session()
218        session.connection(m)
219
220    def test_incomplete_columns(self):
221        """Loading from a select which does not contain all columns"""
222
223        addresses, Address = self.tables.addresses, self.classes.Address
224
225        mapper(Address, addresses)
226        s = create_session()
227        a = s.query(Address).from_statement(
228            sa.select([addresses.c.id, addresses.c.user_id]).
229            order_by(addresses.c.id)).first()
230        eq_(a.user_id, 7)
231        eq_(a.id, 1)
232        # email address auto-defers
233        assert 'email_addres' not in a.__dict__
234        eq_(a.email_address, 'jack@bean.com')
235
236    def test_column_not_present(self):
237        users, addresses, User = (self.tables.users,
238                                  self.tables.addresses,
239                                  self.classes.User)
240
241        assert_raises_message(sa.exc.ArgumentError,
242                              "not represented in the mapper's table",
243                              mapper, User, users,
244                              properties={'foo': addresses.c.user_id})
245
246    def test_constructor_exc(self):
247        """TypeError is raised for illegal constructor args,
248        whether or not explicit __init__ is present [ticket:908]."""
249
250        users, addresses = self.tables.users, self.tables.addresses
251
252        class Foo(object):
253
254            def __init__(self):
255                pass
256
257        class Bar(object):
258            pass
259
260        mapper(Foo, users)
261        mapper(Bar, addresses)
262        assert_raises(TypeError, Foo, x=5)
263        assert_raises(TypeError, Bar, x=5)
264
265    def test_sort_states_comparisons(self):
266        """test that _sort_states() doesn't compare
267        insert_order to state.key, for set of mixed
268        persistent/pending.  In particular Python 3 disallows
269        this.
270
271        """
272        class Foo(object):
273
274            def __init__(self, id):
275                self.id = id
276        m = MetaData()
277        foo_t = Table('foo', m,
278                      Column('id', String, primary_key=True)
279                      )
280        m = mapper(Foo, foo_t)
281
282        class DontCompareMeToString(int):
283            if util.py2k:
284                def __lt__(self, other):
285                    assert not isinstance(other, basestring)
286                    return int(self) < other
287
288        foos = [Foo(id='f%d' % i) for i in range(5)]
289        states = [attributes.instance_state(f) for f in foos]
290
291        for s in states[0:3]:
292            s.key = m._identity_key_from_state(s)
293        states[3].insert_order = DontCompareMeToString(5)
294        states[4].insert_order = DontCompareMeToString(1)
295        states[2].insert_order = DontCompareMeToString(3)
296        eq_(
297            _sort_states(states),
298            [states[4], states[3], states[0], states[1], states[2]]
299        )
300
301    def test_props(self):
302        users, Address, addresses, User = (self.tables.users,
303                                           self.classes.Address,
304                                           self.tables.addresses,
305                                           self.classes.User)
306
307        m = mapper(User, users, properties={
308            'addresses': relationship(mapper(Address, addresses))
309        })
310        assert User.addresses.property is m.get_property('addresses')
311
312    def test_unicode_relationship_backref_names(self):
313        # test [ticket:2901]
314        users, Address, addresses, User = (self.tables.users,
315                                           self.classes.Address,
316                                           self.tables.addresses,
317                                           self.classes.User)
318
319        mapper(Address, addresses)
320        mapper(User, users, properties={
321            util.u('addresses'): relationship(Address, backref=util.u('user'))
322        })
323        u1 = User()
324        a1 = Address()
325        u1.addresses.append(a1)
326        assert a1.user is u1
327
328    def test_configure_on_prop_1(self):
329        users, Address, addresses, User = (self.tables.users,
330                                           self.classes.Address,
331                                           self.tables.addresses,
332                                           self.classes.User)
333
334        mapper(User, users, properties={
335            'addresses': relationship(mapper(Address, addresses))
336        })
337        User.addresses.any(Address.email_address == 'foo@bar.com')
338
339    def test_configure_on_prop_2(self):
340        users, Address, addresses, User = (self.tables.users,
341                                           self.classes.Address,
342                                           self.tables.addresses,
343                                           self.classes.User)
344
345        mapper(User, users, properties={
346            'addresses': relationship(mapper(Address, addresses))
347        })
348        eq_(str(User.id == 3), str(users.c.id == 3))
349
350    def test_configure_on_prop_3(self):
351        users, addresses, User = (self.tables.users,
352                                  self.tables.addresses,
353                                  self.classes.User)
354
355        class Foo(User):
356            pass
357
358        mapper(User, users)
359        mapper(Foo, addresses, inherits=User, properties={
360            'address_id': addresses.c.id
361        })
362        assert getattr(Foo().__class__, 'name').impl is not None
363
364    def test_deferred_subclass_attribute_instrument(self):
365        users, addresses, User = (self.tables.users,
366                                  self.tables.addresses,
367                                  self.classes.User)
368
369        class Foo(User):
370            pass
371
372        mapper(User, users)
373        configure_mappers()
374        mapper(Foo, addresses, inherits=User, properties={
375            'address_id': addresses.c.id
376        })
377        assert getattr(Foo().__class__, 'name').impl is not None
378
379    def test_class_hier_only_instrument_once_multiple_configure(self):
380        users, addresses = (self.tables.users, self.tables.addresses)
381
382        class A(object):
383            pass
384
385        class ASub(A):
386            pass
387
388        class ASubSub(ASub):
389            pass
390
391        class B(object):
392            pass
393
394        from sqlalchemy.testing import mock
395        from sqlalchemy.orm.attributes import register_attribute_impl
396
397        with mock.patch(
398            "sqlalchemy.orm.attributes.register_attribute_impl",
399            side_effect=register_attribute_impl
400        ) as some_mock:
401
402            mapper(A, users, properties={
403                'bs': relationship(B)
404            })
405            mapper(B, addresses)
406
407            configure_mappers()
408
409            mapper(ASub, inherits=A)
410            mapper(ASubSub, inherits=ASub)
411
412            configure_mappers()
413
414        b_calls = [
415            c for c in some_mock.mock_calls if c[1][1] == 'bs'
416        ]
417        eq_(len(b_calls), 3)
418
419    def test_check_descriptor_as_method(self):
420        User, users = self.classes.User, self.tables.users
421
422        m = mapper(User, users)
423
424        class MyClass(User):
425
426            def foo(self):
427                pass
428        m._is_userland_descriptor(MyClass.foo)
429
430    def test_configure_on_get_props_1(self):
431        User, users = self.classes.User, self.tables.users
432
433        m = mapper(User, users)
434        assert not m.configured
435        assert list(m.iterate_properties)
436        assert m.configured
437
438    def test_configure_on_get_props_2(self):
439        User, users = self.classes.User, self.tables.users
440
441        m = mapper(User, users)
442        assert not m.configured
443        assert m.get_property('name')
444        assert m.configured
445
446    def test_configure_on_get_props_3(self):
447        users, Address, addresses, User = (self.tables.users,
448                                           self.classes.Address,
449                                           self.tables.addresses,
450                                           self.classes.User)
451
452        m = mapper(User, users)
453        assert not m.configured
454        configure_mappers()
455
456        m2 = mapper(Address, addresses, properties={
457            'user': relationship(User, backref='addresses')
458        })
459        assert m.get_property('addresses')
460
461    def test_info(self):
462        users = self.tables.users
463        Address = self.classes.Address
464
465        class MyComposite(object):
466            pass
467        for constructor, args in [
468            (column_property, (users.c.name,)),
469            (relationship, (Address,)),
470            (composite, (MyComposite, 'id', 'name')),
471            (synonym, 'foo'),
472            (comparable_property, 'foo')
473        ]:
474            obj = constructor(info={"x": "y"}, *args)
475            eq_(obj.info, {"x": "y"})
476            obj.info["q"] = "p"
477            eq_(obj.info, {"x": "y", "q": "p"})
478
479            obj = constructor(*args)
480            eq_(obj.info, {})
481            obj.info["q"] = "p"
482            eq_(obj.info, {"q": "p"})
483
484    def test_info_via_instrumented(self):
485        m = MetaData()
486        # create specific tables here as we don't want
487        # users.c.id.info to be pre-initialized
488        users = Table('u', m, Column('id', Integer, primary_key=True),
489                      Column('name', String))
490        addresses = Table('a', m, Column('id', Integer, primary_key=True),
491                          Column('name', String),
492                          Column('user_id', Integer, ForeignKey('u.id')))
493        Address = self.classes.Address
494        User = self.classes.User
495
496        mapper(User, users, properties={
497            "name_lower": column_property(func.lower(users.c.name)),
498            "addresses": relationship(Address)
499        })
500        mapper(Address, addresses)
501
502        # attr.info goes down to the original Column object
503        # for the dictionary.  The annotated element needs to pass
504        # this on.
505        assert 'info' not in users.c.id.__dict__
506        is_(User.id.info, users.c.id.info)
507        assert 'info' in users.c.id.__dict__
508
509        # for SQL expressions, ORM-level .info
510        is_(User.name_lower.info, User.name_lower.property.info)
511
512        # same for relationships
513        is_(User.addresses.info, User.addresses.property.info)
514
515    def test_add_property(self):
516        users, addresses, Address = (self.tables.users,
517                                     self.tables.addresses,
518                                     self.classes.Address)
519
520        assert_col = []
521
522        class User(fixtures.ComparableEntity):
523
524            def _get_name(self):
525                assert_col.append(('get', self._name))
526                return self._name
527
528            def _set_name(self, name):
529                assert_col.append(('set', name))
530                self._name = name
531            name = property(_get_name, _set_name)
532
533            def _uc_name(self):
534                if self._name is None:
535                    return None
536                return self._name.upper()
537            uc_name = property(_uc_name)
538            uc_name2 = property(_uc_name)
539
540        m = mapper(User, users)
541        mapper(Address, addresses)
542
543        class UCComparator(sa.orm.PropComparator):
544            __hash__ = None
545
546            def __eq__(self, other):
547                cls = self.prop.parent.class_
548                col = getattr(cls, 'name')
549                if other is None:
550                    return col is None
551                else:
552                    return sa.func.upper(col) == sa.func.upper(other)
553
554        m.add_property('_name', deferred(users.c.name))
555        m.add_property('name', synonym('_name'))
556        m.add_property('addresses', relationship(Address))
557        m.add_property('uc_name', sa.orm.comparable_property(UCComparator))
558        m.add_property('uc_name2', sa.orm.comparable_property(
559            UCComparator, User.uc_name2))
560
561        sess = create_session(autocommit=False)
562        assert sess.query(User).get(7)
563
564        u = sess.query(User).filter_by(name='jack').one()
565
566        def go():
567            eq_(len(u.addresses),
568                len(self.static.user_address_result[0].addresses))
569            eq_(u.name, 'jack')
570            eq_(u.uc_name, 'JACK')
571            eq_(u.uc_name2, 'JACK')
572            eq_(assert_col, [('get', 'jack')], str(assert_col))
573        self.sql_count_(2, go)
574
575        u.name = 'ed'
576        u3 = User()
577        u3.name = 'some user'
578        sess.add(u3)
579        sess.flush()
580        sess.rollback()
581
582    def test_add_prop_via_backref_resets_memoizations_reconfigures(self):
583        users, User = self.tables.users, self.classes.User
584        addresses, Address = self.tables.addresses, self.classes.Address
585
586        m1 = mapper(User, users)
587        User()
588
589        m2 = mapper(Address, addresses, properties={
590            'user': relationship(User, backref="addresses")
591        })
592        # configure mappers takes place when User is generated
593        User()
594        assert hasattr(User, 'addresses')
595        assert "addresses" in [p.key for p in m1._polymorphic_properties]
596
597    def test_replace_col_prop_w_syn(self):
598        users, User = self.tables.users, self.classes.User
599
600        m = mapper(User, users)
601        m.add_property('_name', users.c.name)
602        m.add_property('name', synonym('_name'))
603
604        sess = create_session()
605        u = sess.query(User).filter_by(name='jack').one()
606        eq_(u._name, 'jack')
607        eq_(u.name, 'jack')
608        u.name = 'jacko'
609        assert m._columntoproperty[users.c.name] is m.get_property('_name')
610
611        sa.orm.clear_mappers()
612
613        m = mapper(User, users)
614        m.add_property('name', synonym('_name', map_column=True))
615
616        sess.expunge_all()
617        u = sess.query(User).filter_by(name='jack').one()
618        eq_(u._name, 'jack')
619        eq_(u.name, 'jack')
620        u.name = 'jacko'
621        assert m._columntoproperty[users.c.name] is m.get_property('_name')
622
623    def test_replace_rel_prop_with_rel_warns(self):
624        users, User = self.tables.users, self.classes.User
625        addresses, Address = self.tables.addresses, self.classes.Address
626
627        m = mapper(User, users, properties={
628            "addresses": relationship(Address)
629        })
630        mapper(Address, addresses)
631
632        assert_raises_message(
633            sa.exc.SAWarning,
634            "Property User.addresses on Mapper|User|users being replaced "
635            "with new property User.addresses; the old property will "
636            "be discarded",
637            m.add_property,
638            "addresses", relationship(Address)
639        )
640
641    def test_add_column_prop_deannotate(self):
642        User, users = self.classes.User, self.tables.users
643        Address, addresses = self.classes.Address, self.tables.addresses
644
645        class SubUser(User):
646            pass
647        m = mapper(User, users)
648        m2 = mapper(SubUser, addresses, inherits=User, properties={
649            'address_id': addresses.c.id
650        })
651        m3 = mapper(Address, addresses, properties={
652            'foo': relationship(m2)
653        })
654        # add property using annotated User.name,
655        # needs to be deannotated
656        m.add_property("x", column_property(User.name + "name"))
657        s = create_session()
658        q = s.query(m2).select_from(Address).join(Address.foo)
659        self.assert_compile(
660            q,
661            "SELECT "
662            "addresses_1.id AS addresses_1_id, "
663            "users_1.id AS users_1_id, "
664            "users_1.name AS users_1_name, "
665            "addresses_1.user_id AS addresses_1_user_id, "
666            "addresses_1.email_address AS "
667            "addresses_1_email_address, "
668            "users_1.name || :name_1 AS anon_1 "
669            "FROM addresses JOIN (users AS users_1 JOIN addresses "
670            "AS addresses_1 ON users_1.id = "
671            "addresses_1.user_id) ON "
672            "users_1.id = addresses.user_id"
673        )
674
675    def test_column_prop_deannotate(self):
676        """test that column property deannotates,
677        bringing expressions down to the original mapped columns.
678        """
679        User, users = self.classes.User, self.tables.users
680        m = mapper(User, users)
681        assert User.id.property.columns[0] is users.c.id
682        assert User.name.property.columns[0] is users.c.name
683        expr = User.name + "name"
684        expr2 = sa.select([User.name, users.c.id])
685        m.add_property("x", column_property(expr))
686        m.add_property("y", column_property(expr2))
687
688        assert User.x.property.columns[0] is not expr
689        assert User.x.property.columns[0].element.left is users.c.name
690        # a deannotate needs to clone the base, in case
691        # the original one referenced annotated elements.
692        assert User.x.property.columns[0].element.right is not expr.right
693
694        assert User.y.property.columns[0] is not expr2
695        assert User.y.property.columns[0].element.\
696            _raw_columns[0] is users.c.name
697        assert User.y.property.columns[0].element.\
698            _raw_columns[1] is users.c.id
699
700    def test_synonym_replaces_backref(self):
701        addresses, users, User = (self.tables.addresses,
702                                  self.tables.users,
703                                  self.classes.User)
704
705        assert_calls = []
706
707        class Address(object):
708
709            def _get_user(self):
710                assert_calls.append("get")
711                return self._user
712
713            def _set_user(self, user):
714                assert_calls.append("set")
715                self._user = user
716            user = property(_get_user, _set_user)
717
718        # synonym is created against nonexistent prop
719        mapper(Address, addresses, properties={
720            'user': synonym('_user')
721        })
722        sa.orm.configure_mappers()
723
724        # later, backref sets up the prop
725        mapper(User, users, properties={
726            'addresses': relationship(Address, backref='_user')
727        })
728
729        sess = create_session()
730        u1 = sess.query(User).get(7)
731        u2 = sess.query(User).get(8)
732        # comparaison ops need to work
733        a1 = sess.query(Address).filter(Address.user == u1).one()
734        eq_(a1.id, 1)
735        a1.user = u2
736        assert a1.user is u2
737        eq_(assert_calls, ["set", "get"])
738
739    def test_self_ref_synonym(self):
740        t = Table('nodes', MetaData(),
741                  Column(
742                      'id', Integer, primary_key=True,
743                      test_needs_autoincrement=True),
744                  Column('parent_id', Integer, ForeignKey('nodes.id')))
745
746        class Node(object):
747            pass
748
749        mapper(Node, t, properties={
750            '_children': relationship(
751                Node, backref=backref('_parent', remote_side=t.c.id)),
752            'children': synonym('_children'),
753            'parent': synonym('_parent')
754        })
755
756        n1 = Node()
757        n2 = Node()
758        n1.children.append(n2)
759        assert n2.parent is n2._parent is n1
760        assert n1.children[0] is n1._children[0] is n2
761        eq_(str(Node.parent == n2), ":param_1 = nodes.parent_id")
762
763    def test_non_primary_identity_class(self):
764        User = self.classes.User
765        users, addresses = self.tables.users, self.tables.addresses
766
767        class AddressUser(User):
768            pass
769        m1 = mapper(User, users, polymorphic_identity='user')
770        m2 = mapper(AddressUser, addresses, inherits=User,
771                    polymorphic_identity='address', properties={
772                        'address_id': addresses.c.id
773                    })
774        m3 = mapper(AddressUser, addresses, non_primary=True)
775        assert m3._identity_class is m2._identity_class
776        eq_(
777            m2.identity_key_from_instance(AddressUser()),
778            m3.identity_key_from_instance(AddressUser())
779        )
780
781    def test_reassign_polymorphic_identity_warns(self):
782        User = self.classes.User
783        users = self.tables.users
784
785        class MyUser(User):
786            pass
787        m1 = mapper(User, users, polymorphic_on=users.c.name,
788                    polymorphic_identity='user')
789        assert_raises_message(
790            sa.exc.SAWarning,
791            "Reassigning polymorphic association for identity 'user'",
792            mapper,
793            MyUser, users, inherits=User, polymorphic_identity='user'
794        )
795
796    def test_illegal_non_primary(self):
797        users, Address, addresses, User = (self.tables.users,
798                                           self.classes.Address,
799                                           self.tables.addresses,
800                                           self.classes.User)
801
802        mapper(User, users)
803        mapper(Address, addresses)
804        mapper(User, users, non_primary=True, properties={
805            'addresses': relationship(Address)
806        })
807        assert_raises_message(
808            sa.exc.ArgumentError,
809            "Attempting to assign a new relationship 'addresses' "
810            "to a non-primary mapper on class 'User'",
811            configure_mappers
812        )
813
814    def test_illegal_non_primary_2(self):
815        User, users = self.classes.User, self.tables.users
816
817        assert_raises_message(
818            sa.exc.InvalidRequestError,
819            "Configure a primary mapper first",
820            mapper, User, users, non_primary=True)
821
822    def test_illegal_non_primary_3(self):
823        users, addresses = self.tables.users, self.tables.addresses
824
825        class Base(object):
826            pass
827
828        class Sub(Base):
829            pass
830        mapper(Base, users)
831        assert_raises_message(sa.exc.InvalidRequestError,
832                              "Configure a primary mapper first",
833                              mapper, Sub, addresses, non_primary=True
834                              )
835
836    def test_prop_filters(self):
837        t = Table('person', MetaData(),
838                  Column('id', Integer, primary_key=True,
839                         test_needs_autoincrement=True),
840                  Column('type', String(128)),
841                  Column('name', String(128)),
842                  Column('employee_number', Integer),
843                  Column('boss_id', Integer, ForeignKey('person.id')),
844                  Column('vendor_id', Integer))
845
846        class Person(object):
847            pass
848
849        class Vendor(Person):
850            pass
851
852        class Employee(Person):
853            pass
854
855        class Manager(Employee):
856            pass
857
858        class Hoho(object):
859            pass
860
861        class Lala(object):
862            pass
863
864        class Fub(object):
865            pass
866
867        class Frob(object):
868            pass
869
870        class HasDef(object):
871
872            def name(self):
873                pass
874
875        class Empty(object):
876            pass
877
878        mapper(
879            Empty, t, properties={'empty_id': t.c.id},
880            include_properties=[])
881        p_m = mapper(Person, t, polymorphic_on=t.c.type,
882                     include_properties=('id', 'type', 'name'))
883        e_m = mapper(Employee, inherits=p_m,
884                     polymorphic_identity='employee',
885                     properties={
886                         'boss': relationship(
887                             Manager, backref=backref('peon'),
888                             remote_side=t.c.id)},
889                     exclude_properties=('vendor_id', ))
890
891        mapper(
892            Manager, inherits=e_m, polymorphic_identity='manager',
893            include_properties=('id', 'type'))
894
895        mapper(
896            Vendor, inherits=p_m, polymorphic_identity='vendor',
897            exclude_properties=('boss_id', 'employee_number'))
898        mapper(Hoho, t, include_properties=('id', 'type', 'name'))
899        mapper(
900            Lala, t, exclude_properties=('vendor_id', 'boss_id'),
901            column_prefix="p_")
902
903        mapper(HasDef, t, column_prefix="h_")
904
905        mapper(Fub, t, include_properties=(t.c.id, t.c.type))
906        mapper(
907            Frob, t, column_prefix='f_',
908            exclude_properties=(
909                t.c.boss_id,
910                'employee_number', t.c.vendor_id))
911
912        configure_mappers()
913
914        def assert_props(cls, want):
915            have = set([n for n in dir(cls) if not n.startswith('_')])
916            want = set(want)
917            eq_(have, want)
918
919        def assert_instrumented(cls, want):
920            have = set([p.key for p in class_mapper(cls).iterate_properties])
921            want = set(want)
922            eq_(have, want)
923
924        assert_props(HasDef, ['h_boss_id', 'h_employee_number', 'h_id',
925                              'name', 'h_name', 'h_vendor_id', 'h_type'])
926        assert_props(Person, ['id', 'name', 'type'])
927        assert_instrumented(Person, ['id', 'name', 'type'])
928        assert_props(Employee, ['boss', 'boss_id', 'employee_number',
929                                'id', 'name', 'type'])
930        assert_instrumented(Employee, ['boss', 'boss_id', 'employee_number',
931                                       'id', 'name', 'type'])
932        assert_props(Manager, ['boss', 'boss_id', 'employee_number', 'peon',
933                               'id', 'name', 'type'])
934
935        # 'peon' and 'type' are both explicitly stated properties
936        assert_instrumented(Manager, ['peon', 'type', 'id'])
937
938        assert_props(Vendor, ['vendor_id', 'id', 'name', 'type'])
939        assert_props(Hoho, ['id', 'name', 'type'])
940        assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type'])
941        assert_props(Fub, ['id', 'type'])
942        assert_props(Frob, ['f_id', 'f_type', 'f_name', ])
943
944        # putting the discriminator column in exclude_properties,
945        # very weird.  As of 0.7.4 this re-maps it.
946        class Foo(Person):
947            pass
948        assert_props(Empty, ['empty_id'])
949
950        mapper(
951            Foo, inherits=Person, polymorphic_identity='foo',
952            exclude_properties=('type', ),
953        )
954        assert hasattr(Foo, "type")
955        assert Foo.type.property.columns[0] is t.c.type
956
957    @testing.provide_metadata
958    def test_prop_filters_defaults(self):
959        metadata = self.metadata
960        t = Table('t', metadata,
961                  Column(
962                      'id', Integer(), primary_key=True,
963                      test_needs_autoincrement=True),
964                  Column('x', Integer(), nullable=False, server_default='0')
965                  )
966        t.create()
967
968        class A(object):
969            pass
970        mapper(A, t, include_properties=['id'])
971        s = Session()
972        s.add(A())
973        s.commit()
974
975    def test_we_dont_call_bool(self):
976        class NoBoolAllowed(object):
977
978            def __bool__(self):
979                raise Exception("nope")
980        mapper(NoBoolAllowed, self.tables.users)
981        u1 = NoBoolAllowed()
982        u1.name = "some name"
983        s = Session(testing.db)
984        s.add(u1)
985        s.commit()
986        assert s.query(NoBoolAllowed).get(u1.id) is u1
987
988    def test_we_dont_call_eq(self):
989        class NoEqAllowed(object):
990
991            def __eq__(self, other):
992                raise Exception("nope")
993
994        addresses, users = self.tables.addresses, self.tables.users
995        Address = self.classes.Address
996
997        mapper(NoEqAllowed, users, properties={
998            'addresses': relationship(Address, backref='user')
999        })
1000        mapper(Address, addresses)
1001
1002        u1 = NoEqAllowed()
1003        u1.name = "some name"
1004        u1.addresses = [Address(id=12, email_address='a1')]
1005        s = Session(testing.db)
1006        s.add(u1)
1007        s.commit()
1008
1009        a1 = s.query(Address).filter_by(id=12).one()
1010        assert a1.user is u1
1011
1012    def test_mapping_to_join_raises(self):
1013        """Test implicit merging of two cols raises."""
1014
1015        addresses, users, User = (self.tables.addresses,
1016                                  self.tables.users,
1017                                  self.classes.User)
1018
1019        usersaddresses = sa.join(users, addresses,
1020                                 users.c.id == addresses.c.user_id)
1021        assert_raises_message(
1022            sa.exc.InvalidRequestError,
1023            "Implicitly",
1024            mapper, User, usersaddresses, primary_key=[users.c.id]
1025        )
1026
1027    def test_mapping_to_join_explicit_prop(self):
1028        """Mapping to a join"""
1029
1030        User, addresses, users = (self.classes.User,
1031                                  self.tables.addresses,
1032                                  self.tables.users)
1033
1034        usersaddresses = sa.join(users, addresses, users.c.id
1035                                 == addresses.c.user_id)
1036        mapper(User, usersaddresses, primary_key=[users.c.id],
1037               properties={'add_id': addresses.c.id}
1038               )
1039        result = create_session().query(User).order_by(users.c.id).all()
1040        eq_(result, self.static.user_result[:3])
1041
1042    def test_mapping_to_join_exclude_prop(self):
1043        """Mapping to a join"""
1044
1045        User, addresses, users = (self.classes.User,
1046                                  self.tables.addresses,
1047                                  self.tables.users)
1048
1049        usersaddresses = sa.join(users, addresses, users.c.id
1050                                 == addresses.c.user_id)
1051        mapper(User, usersaddresses, primary_key=[users.c.id],
1052               exclude_properties=[addresses.c.id]
1053               )
1054        result = create_session().query(User).order_by(users.c.id).all()
1055        eq_(result, self.static.user_result[:3])
1056
1057    def test_mapping_to_join_no_pk(self):
1058        email_bounces, addresses, Address = (self.tables.email_bounces,
1059                                             self.tables.addresses,
1060                                             self.classes.Address)
1061
1062        m = mapper(Address,
1063                   addresses.join(email_bounces),
1064                   properties={'id': [addresses.c.id, email_bounces.c.id]}
1065                   )
1066        configure_mappers()
1067        assert addresses in m._pks_by_table
1068        assert email_bounces not in m._pks_by_table
1069
1070        sess = create_session()
1071        a = Address(id=10, email_address='e1')
1072        sess.add(a)
1073        sess.flush()
1074
1075        eq_(
1076            select([func.count('*')]).select_from(addresses).scalar(), 6)
1077        eq_(
1078            select([func.count('*')]).select_from(email_bounces).scalar(), 5)
1079
1080    def test_mapping_to_outerjoin(self):
1081        """Mapping to an outer join with a nullable composite primary key."""
1082
1083        users, addresses, User = (self.tables.users,
1084                                  self.tables.addresses,
1085                                  self.classes.User)
1086
1087        mapper(User, users.outerjoin(addresses),
1088               primary_key=[users.c.id, addresses.c.id],
1089               properties=dict(
1090            address_id=addresses.c.id))
1091
1092        session = create_session()
1093        result = session.query(User).order_by(User.id, User.address_id).all()
1094
1095        eq_(result, [
1096            User(id=7, address_id=1),
1097            User(id=8, address_id=2),
1098            User(id=8, address_id=3),
1099            User(id=8, address_id=4),
1100            User(id=9, address_id=5),
1101            User(id=10, address_id=None)])
1102
1103    def test_mapping_to_outerjoin_no_partial_pks(self):
1104        """test the allow_partial_pks=False flag."""
1105
1106        users, addresses, User = (self.tables.users,
1107                                  self.tables.addresses,
1108                                  self.classes.User)
1109
1110        mapper(User, users.outerjoin(addresses),
1111               allow_partial_pks=False,
1112               primary_key=[users.c.id, addresses.c.id],
1113               properties=dict(
1114            address_id=addresses.c.id))
1115
1116        session = create_session()
1117        result = session.query(User).order_by(User.id, User.address_id).all()
1118
1119        eq_(result, [
1120            User(id=7, address_id=1),
1121            User(id=8, address_id=2),
1122            User(id=8, address_id=3),
1123            User(id=8, address_id=4),
1124            User(id=9, address_id=5),
1125            None])
1126
1127    def test_scalar_pk_arg(self):
1128        users, Keyword, items, Item, User, keywords = (self.tables.users,
1129                                                       self.classes.Keyword,
1130                                                       self.tables.items,
1131                                                       self.classes.Item,
1132                                                       self.classes.User,
1133                                                       self.tables.keywords)
1134
1135        m1 = mapper(Item, items, primary_key=[items.c.id])
1136        m2 = mapper(Keyword, keywords, primary_key=keywords.c.id)
1137        m3 = mapper(User, users, primary_key=(users.c.id,))
1138
1139        assert m1.primary_key[0] is items.c.id
1140        assert m2.primary_key[0] is keywords.c.id
1141        assert m3.primary_key[0] is users.c.id
1142
1143    def test_custom_join(self):
1144        """select_from totally replace the FROM parameters."""
1145
1146        users, items, order_items, orders, Item, User, Order = (
1147            self.tables.users,
1148            self.tables.items,
1149            self.tables.order_items,
1150            self.tables.orders,
1151            self.classes.Item,
1152            self.classes.User,
1153            self.classes.Order)
1154
1155        mapper(Item, items)
1156
1157        mapper(Order, orders, properties=dict(
1158            items=relationship(Item, order_items)))
1159
1160        mapper(User, users, properties=dict(
1161            orders=relationship(Order)))
1162
1163        session = create_session()
1164        result = (session.query(User).
1165                  select_from(users.join(orders).
1166                              join(order_items).
1167                              join(items)).
1168                  filter(items.c.description == 'item 4')).all()
1169
1170        eq_(result, [self.static.user_result[0]])
1171
1172    @testing.uses_deprecated("Mapper.order_by")
1173    def test_cancel_order_by(self):
1174        users, User = self.tables.users, self.classes.User
1175
1176        mapper(User, users, order_by=users.c.name.desc())
1177
1178        assert "order by users.name desc" in \
1179            str(create_session().query(User).statement).lower()
1180        assert "order by" not in \
1181            str(create_session().query(User).order_by(None).statement).lower()
1182        assert "order by users.name asc" in \
1183            str(create_session().query(User).order_by(
1184                User.name.asc()).statement).lower()
1185
1186        eq_(
1187            create_session().query(User).all(),
1188            [User(id=7, name='jack'), User(id=9, name='fred'),
1189             User(id=8, name='ed'), User(id=10, name='chuck')]
1190        )
1191
1192        eq_(
1193            create_session().query(User).order_by(User.name).all(),
1194            [User(id=10, name='chuck'), User(id=8, name='ed'),
1195             User(id=9, name='fred'), User(id=7, name='jack')]
1196        )
1197
1198    # 'Raises a "expression evaluation not supported" error at prepare time
1199    @testing.fails_on('firebird', 'FIXME: unknown')
1200    def test_function(self):
1201        """Mapping to a SELECT statement that has functions in it."""
1202
1203        addresses, users, User = (self.tables.addresses,
1204                                  self.tables.users,
1205                                  self.classes.User)
1206
1207        s = sa.select([users,
1208                       (users.c.id * 2).label('concat'),
1209                       sa.func.count(addresses.c.id).label('count')],
1210                      users.c.id == addresses.c.user_id,
1211                      group_by=[c for c in users.c]).alias('myselect')
1212
1213        mapper(User, s)
1214        sess = create_session()
1215        result = sess.query(User).order_by(s.c.id).all()
1216
1217        for idx, total in enumerate((14, 16)):
1218            eq_(result[idx].concat, result[idx].id * 2)
1219            eq_(result[idx].concat, total)
1220
1221    def test_count(self):
1222        """The count function on Query."""
1223
1224        User, users = self.classes.User, self.tables.users
1225
1226        mapper(User, users)
1227
1228        session = create_session()
1229        q = session.query(User)
1230
1231        eq_(q.count(), 4)
1232        eq_(q.filter(User.id.in_([8, 9])).count(), 2)
1233        eq_(q.filter(users.c.id.in_([8, 9])).count(), 2)
1234
1235        eq_(session.query(User.id).count(), 4)
1236        eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2)
1237
1238    def test_many_to_many_count(self):
1239        keywords, items, item_keywords, Keyword, Item = (
1240            self.tables.keywords,
1241            self.tables.items,
1242            self.tables.item_keywords,
1243            self.classes.Keyword,
1244            self.classes.Item)
1245
1246        mapper(Keyword, keywords)
1247        mapper(Item, items, properties=dict(
1248            keywords=relationship(Keyword, item_keywords, lazy='select')))
1249
1250        session = create_session()
1251        q = (session.query(Item).
1252             join('keywords').
1253             distinct().
1254             filter(Keyword.name == "red"))
1255        eq_(q.count(), 2)
1256
1257    def test_override_1(self):
1258        """Overriding a column raises an error."""
1259
1260        Address, addresses, users, User = (self.classes.Address,
1261                                           self.tables.addresses,
1262                                           self.tables.users,
1263                                           self.classes.User)
1264
1265        def go():
1266            mapper(User, users,
1267                   properties=dict(
1268                       name=relationship(mapper(Address, addresses))))
1269
1270        assert_raises(sa.exc.ArgumentError, go)
1271
1272    def test_override_2(self):
1273        """exclude_properties cancels the error."""
1274
1275        Address, addresses, users, User = (self.classes.Address,
1276                                           self.tables.addresses,
1277                                           self.tables.users,
1278                                           self.classes.User)
1279
1280        mapper(User, users,
1281               exclude_properties=['name'],
1282               properties=dict(
1283                   name=relationship(mapper(Address, addresses))))
1284
1285        assert bool(User.name)
1286
1287    def test_override_3(self):
1288        """The column being named elsewhere also cancels the error,"""
1289
1290        Address, addresses, users, User = (self.classes.Address,
1291                                           self.tables.addresses,
1292                                           self.tables.users,
1293                                           self.classes.User)
1294
1295        mapper(User, users,
1296               properties=dict(
1297                   name=relationship(mapper(Address, addresses)),
1298                   foo=users.c.name))
1299
1300    def test_synonym(self):
1301        users, addresses, Address = (self.tables.users,
1302                                     self.tables.addresses,
1303                                     self.classes.Address)
1304
1305        assert_col = []
1306
1307        class extendedproperty(property):
1308            attribute = 123
1309
1310        class User(object):
1311
1312            def _get_name(self):
1313                assert_col.append(('get', self.name))
1314                return self.name
1315
1316            def _set_name(self, name):
1317                assert_col.append(('set', name))
1318                self.name = name
1319            uname = extendedproperty(_get_name, _set_name)
1320
1321        mapper(User, users, properties=dict(
1322            addresses=relationship(mapper(Address, addresses), lazy='select'),
1323            uname=synonym('name'),
1324            adlist=synonym('addresses'),
1325            adname=synonym('addresses')
1326        ))
1327
1328        # ensure the synonym can get at the proxied comparators without
1329        # an explicit compile
1330        User.name == 'ed'
1331        User.adname.any()
1332
1333        assert hasattr(User, 'adlist')
1334        # as of 0.4.2, synonyms always create a property
1335        assert hasattr(User, 'adname')
1336
1337        # test compile
1338        assert not isinstance(User.uname == 'jack', bool)
1339
1340        assert User.uname.property
1341        assert User.adlist.property
1342
1343        sess = create_session()
1344
1345        # test RowTuple names
1346        row = sess.query(User.id, User.uname).first()
1347        assert row.uname == row[1]
1348
1349        u = sess.query(User).filter(User.uname == 'jack').one()
1350
1351        fixture = self.static.user_address_result[0].addresses
1352        eq_(u.adlist, fixture)
1353
1354        addr = sess.query(Address).filter_by(id=fixture[0].id).one()
1355        u = sess.query(User).filter(User.adname.contains(addr)).one()
1356        u2 = sess.query(User).filter(User.adlist.contains(addr)).one()
1357
1358        assert u is u2
1359
1360        assert u not in sess.dirty
1361        u.uname = "some user name"
1362        assert len(assert_col) > 0
1363        eq_(assert_col, [('set', 'some user name')])
1364        eq_(u.uname, "some user name")
1365        eq_(assert_col, [('set', 'some user name'), ('get', 'some user name')])
1366        eq_(u.name, "some user name")
1367        assert u in sess.dirty
1368
1369        eq_(User.uname.attribute, 123)
1370
1371    def test_synonym_of_synonym(self):
1372        users, User = (self.tables.users,
1373                       self.classes.User)
1374
1375        mapper(User, users, properties={
1376            'x': synonym('id'),
1377            'y': synonym('x')
1378        })
1379
1380        s = Session()
1381        u = s.query(User).filter(User.y == 8).one()
1382        eq_(u.y, 8)
1383
1384    def test_synonym_of_non_property_raises(self):
1385        from sqlalchemy.ext.associationproxy import association_proxy
1386
1387        class User(object):
1388            pass
1389
1390        users, Address, addresses = (
1391            self.tables.users,
1392            self.classes.Address,
1393            self.tables.addresses)
1394
1395        mapper(User, users, properties={
1396            'y': synonym('x'),
1397            'addresses': relationship(Address)
1398        })
1399        mapper(Address, addresses)
1400        User.x = association_proxy("addresses", "email_address")
1401
1402        assert_raises_message(
1403            sa.exc.InvalidRequestError,
1404            r'synonym\(\) attribute "User.x" only supports ORM mapped '
1405            'attributes, got .*AssociationProxy',
1406            getattr, User.y, "property"
1407        )
1408
1409    def test_synonym_column_location(self):
1410        users, User = self.tables.users, self.classes.User
1411
1412        def go():
1413            mapper(User, users, properties={
1414                'not_name': synonym('_name', map_column=True)})
1415
1416        assert_raises_message(
1417            sa.exc.ArgumentError,
1418            ("Can't compile synonym '_name': no column on table "
1419             "'users' named 'not_name'"),
1420            go)
1421
1422    def test_column_synonyms(self):
1423        """Synonyms which automatically instrument properties,
1424        set up aliased column, etc."""
1425
1426        addresses, users, Address = (self.tables.addresses,
1427                                     self.tables.users,
1428                                     self.classes.Address)
1429
1430        assert_col = []
1431
1432        class User(object):
1433
1434            def _get_name(self):
1435                assert_col.append(('get', self._name))
1436                return self._name
1437
1438            def _set_name(self, name):
1439                assert_col.append(('set', name))
1440                self._name = name
1441            name = property(_get_name, _set_name)
1442
1443        mapper(Address, addresses)
1444        mapper(User, users, properties={
1445            'addresses': relationship(Address, lazy='select'),
1446            'name': synonym('_name', map_column=True)
1447        })
1448
1449        # test compile
1450        assert not isinstance(User.name == 'jack', bool)
1451
1452        assert hasattr(User, 'name')
1453        assert hasattr(User, '_name')
1454
1455        sess = create_session()
1456        u = sess.query(User).filter(User.name == 'jack').one()
1457        eq_(u.name, 'jack')
1458        u.name = 'foo'
1459        eq_(u.name, 'foo')
1460        eq_(assert_col, [('get', 'jack'), ('set', 'foo'), ('get', 'foo')])
1461
1462    def test_synonym_map_column_conflict(self):
1463        users, User = self.tables.users, self.classes.User
1464
1465        assert_raises(
1466            sa.exc.ArgumentError,
1467            mapper,
1468            User, users, properties=util.OrderedDict([
1469                ('_user_id', users.c.id),
1470                ('id', synonym('_user_id', map_column=True)),
1471            ])
1472        )
1473
1474        assert_raises(
1475            sa.exc.ArgumentError,
1476            mapper,
1477            User, users, properties=util.OrderedDict([
1478                ('id', synonym('_user_id', map_column=True)),
1479                ('_user_id', users.c.id),
1480            ])
1481        )
1482
1483    def test_comparable(self):
1484        users = self.tables.users
1485
1486        class extendedproperty(property):
1487            attribute = 123
1488
1489            def method1(self):
1490                return "method1"
1491
1492        from sqlalchemy.orm.properties import ColumnProperty
1493
1494        class UCComparator(ColumnProperty.Comparator):
1495            __hash__ = None
1496
1497            def method1(self):
1498                return "uccmethod1"
1499
1500            def method2(self, other):
1501                return "method2"
1502
1503            def __eq__(self, other):
1504                cls = self.prop.parent.class_
1505                col = getattr(cls, 'name')
1506                if other is None:
1507                    return col is None
1508                else:
1509                    return sa.func.upper(col) == sa.func.upper(other)
1510
1511        def map_(with_explicit_property):
1512            class User(object):
1513
1514                @extendedproperty
1515                def uc_name(self):
1516                    if self.name is None:
1517                        return None
1518                    return self.name.upper()
1519            if with_explicit_property:
1520                args = (UCComparator, User.uc_name)
1521            else:
1522                args = (UCComparator,)
1523            mapper(User, users, properties=dict(
1524                uc_name=sa.orm.comparable_property(*args)))
1525            return User
1526
1527        for User in (map_(True), map_(False)):
1528            sess = create_session()
1529            sess.begin()
1530            q = sess.query(User)
1531
1532            assert hasattr(User, 'name')
1533            assert hasattr(User, 'uc_name')
1534
1535            eq_(User.uc_name.method1(), "method1")
1536            eq_(User.uc_name.method2('x'), "method2")
1537
1538            assert_raises_message(
1539                AttributeError,
1540                "Neither 'extendedproperty' object nor 'UCComparator' "
1541                "object associated with User.uc_name has an attribute "
1542                "'nonexistent'",
1543                getattr, User.uc_name, 'nonexistent')
1544
1545            # test compile
1546            assert not isinstance(User.uc_name == 'jack', bool)
1547            u = q.filter(User.uc_name == 'JACK').one()
1548
1549            assert u.uc_name == "JACK"
1550            assert u not in sess.dirty
1551
1552            u.name = "some user name"
1553            eq_(u.name, "some user name")
1554            assert u in sess.dirty
1555            eq_(u.uc_name, "SOME USER NAME")
1556
1557            sess.flush()
1558            sess.expunge_all()
1559
1560            q = sess.query(User)
1561            u2 = q.filter(User.name == 'some user name').one()
1562            u3 = q.filter(User.uc_name == 'SOME USER NAME').one()
1563
1564            assert u2 is u3
1565
1566            eq_(User.uc_name.attribute, 123)
1567            sess.rollback()
1568
1569    def test_comparable_column(self):
1570        users, User = self.tables.users, self.classes.User
1571
1572        class MyComparator(sa.orm.properties.ColumnProperty.Comparator):
1573            __hash__ = None
1574
1575            def __eq__(self, other):
1576                # lower case comparison
1577                return func.lower(self.__clause_element__()
1578                                  ) == func.lower(other)
1579
1580            def intersects(self, other):
1581                # non-standard comparator
1582                return self.__clause_element__().op('&=')(other)
1583
1584        mapper(User, users, properties={
1585            'name': sa.orm.column_property(users.c.name,
1586                                           comparator_factory=MyComparator)
1587        })
1588
1589        assert_raises_message(
1590            AttributeError,
1591            "Neither 'InstrumentedAttribute' object nor "
1592            "'MyComparator' object associated with User.name has "
1593            "an attribute 'nonexistent'",
1594            getattr, User.name, "nonexistent")
1595
1596        eq_(
1597            str((User.name == 'ed').compile(
1598                dialect=sa.engine.default.DefaultDialect())),
1599            "lower(users.name) = lower(:lower_1)")
1600        eq_(
1601            str((User.name.intersects('ed')).compile(
1602                dialect=sa.engine.default.DefaultDialect())),
1603            "users.name &= :name_1")
1604
1605    def test_reentrant_compile(self):
1606        users, Address, addresses, User = (self.tables.users,
1607                                           self.classes.Address,
1608                                           self.tables.addresses,
1609                                           self.classes.User)
1610
1611        class MyFakeProperty(sa.orm.properties.ColumnProperty):
1612
1613            def post_instrument_class(self, mapper):
1614                super(MyFakeProperty, self).post_instrument_class(mapper)
1615                configure_mappers()
1616
1617        m1 = mapper(User, users, properties={
1618            'name': MyFakeProperty(users.c.name)
1619        })
1620        m2 = mapper(Address, addresses)
1621        configure_mappers()
1622
1623        sa.orm.clear_mappers()
1624
1625        class MyFakeProperty(sa.orm.properties.ColumnProperty):
1626
1627            def post_instrument_class(self, mapper):
1628                super(MyFakeProperty, self).post_instrument_class(mapper)
1629                configure_mappers()
1630
1631        m1 = mapper(User, users, properties={
1632            'name': MyFakeProperty(users.c.name)
1633        })
1634        m2 = mapper(Address, addresses)
1635        configure_mappers()
1636
1637    def test_reconstructor(self):
1638        users = self.tables.users
1639
1640        recon = []
1641
1642        class User(object):
1643
1644            @reconstructor
1645            def reconstruct(self):
1646                recon.append('go')
1647
1648        mapper(User, users)
1649
1650        User()
1651        eq_(recon, [])
1652        create_session().query(User).first()
1653        eq_(recon, ['go'])
1654
1655    def test_reconstructor_inheritance(self):
1656        users = self.tables.users
1657
1658        recon = []
1659
1660        class A(object):
1661
1662            @reconstructor
1663            def reconstruct(self):
1664                assert isinstance(self, A)
1665                recon.append('A')
1666
1667        class B(A):
1668
1669            @reconstructor
1670            def reconstruct(self):
1671                assert isinstance(self, B)
1672                recon.append('B')
1673
1674        class C(A):
1675
1676            @reconstructor
1677            def reconstruct(self):
1678                assert isinstance(self, C)
1679                recon.append('C')
1680
1681        mapper(A, users, polymorphic_on=users.c.name,
1682               polymorphic_identity='jack')
1683        mapper(B, inherits=A, polymorphic_identity='ed')
1684        mapper(C, inherits=A, polymorphic_identity='chuck')
1685
1686        A()
1687        B()
1688        C()
1689        eq_(recon, [])
1690
1691        sess = create_session()
1692        sess.query(A).first()
1693        sess.query(B).first()
1694        sess.query(C).first()
1695        eq_(recon, ['A', 'B', 'C'])
1696
1697    def test_unmapped_reconstructor_inheritance(self):
1698        users = self.tables.users
1699
1700        recon = []
1701
1702        class Base(object):
1703
1704            @reconstructor
1705            def reconstruct(self):
1706                recon.append('go')
1707
1708        class User(Base):
1709            pass
1710
1711        mapper(User, users)
1712
1713        User()
1714        eq_(recon, [])
1715
1716        create_session().query(User).first()
1717        eq_(recon, ['go'])
1718
1719    def test_unmapped_error(self):
1720        Address, addresses, users, User = (self.classes.Address,
1721                                           self.tables.addresses,
1722                                           self.tables.users,
1723                                           self.classes.User)
1724
1725        mapper(Address, addresses)
1726        sa.orm.clear_mappers()
1727
1728        mapper(User, users, properties={
1729            'addresses': relationship(Address)
1730        })
1731
1732        assert_raises_message(
1733            sa.orm.exc.UnmappedClassError,
1734            "Class 'test.orm._fixtures.Address' is not mapped",
1735            sa.orm.configure_mappers)
1736
1737    def test_unmapped_not_type_error(self):
1738        assert_raises_message(
1739            sa.exc.ArgumentError,
1740            "Class object expected, got '5'.",
1741            class_mapper, 5
1742        )
1743
1744    def test_unmapped_not_type_error_iter_ok(self):
1745        assert_raises_message(
1746            sa.exc.ArgumentError,
1747            r"Class object expected, got '\(5, 6\)'.",
1748            class_mapper, (5, 6)
1749        )
1750
1751    def test_attribute_error_raised_class_mapper(self):
1752        users = self.tables.users
1753        addresses = self.tables.addresses
1754        User = self.classes.User
1755        Address = self.classes.Address
1756
1757        mapper(User, users, properties={
1758            "addresses": relationship(
1759                Address,
1760                primaryjoin=lambda: users.c.id == addresses.wrong.user_id)
1761        })
1762        mapper(Address, addresses)
1763        assert_raises_message(
1764            AttributeError,
1765            "'Table' object has no attribute 'wrong'",
1766            class_mapper, Address
1767        )
1768
1769    def test_key_error_raised_class_mapper(self):
1770        users = self.tables.users
1771        addresses = self.tables.addresses
1772        User = self.classes.User
1773        Address = self.classes.Address
1774
1775        mapper(User, users, properties={
1776            "addresses": relationship(Address,
1777                                      primaryjoin=lambda: users.c.id ==
1778                                      addresses.__dict__['wrong'].user_id)
1779        })
1780        mapper(Address, addresses)
1781        assert_raises_message(
1782            KeyError,
1783            "wrong",
1784            class_mapper, Address
1785        )
1786
1787    def test_unmapped_subclass_error_postmap(self):
1788        users = self.tables.users
1789
1790        class Base(object):
1791            pass
1792
1793        class Sub(Base):
1794            pass
1795
1796        mapper(Base, users)
1797        sa.orm.configure_mappers()
1798
1799        # we can create new instances, set attributes.
1800        s = Sub()
1801        s.name = 'foo'
1802        eq_(s.name, 'foo')
1803        eq_(
1804            attributes.get_history(s, 'name'),
1805            (['foo'], (), ())
1806        )
1807
1808        # using it with an ORM operation, raises
1809        assert_raises(sa.orm.exc.UnmappedClassError,
1810                      create_session().add, Sub())
1811
1812    def test_unmapped_subclass_error_premap(self):
1813        users = self.tables.users
1814
1815        class Base(object):
1816            pass
1817
1818        mapper(Base, users)
1819
1820        class Sub(Base):
1821            pass
1822
1823        sa.orm.configure_mappers()
1824
1825        # we can create new instances, set attributes.
1826        s = Sub()
1827        s.name = 'foo'
1828        eq_(s.name, 'foo')
1829        eq_(
1830            attributes.get_history(s, 'name'),
1831            (['foo'], (), ())
1832        )
1833
1834        # using it with an ORM operation, raises
1835        assert_raises(sa.orm.exc.UnmappedClassError,
1836                      create_session().add, Sub())
1837
1838    def test_oldstyle_mixin(self):
1839        users = self.tables.users
1840
1841        class OldStyle:
1842            pass
1843
1844        class NewStyle(object):
1845            pass
1846
1847        class A(NewStyle, OldStyle):
1848            pass
1849
1850        mapper(A, users)
1851
1852        class B(OldStyle, NewStyle):
1853            pass
1854
1855        mapper(B, users)
1856
1857
1858class DocumentTest(fixtures.TestBase):
1859
1860    def test_doc_propagate(self):
1861        metadata = MetaData()
1862        t1 = Table('t1', metadata,
1863                   Column('col1', Integer, primary_key=True,
1864                          doc="primary key column"),
1865                   Column('col2', String, doc="data col"),
1866                   Column('col3', String, doc="data col 2"),
1867                   Column('col4', String, doc="data col 3"),
1868                   Column('col5', String),
1869                   )
1870        t2 = Table('t2', metadata,
1871                   Column('col1', Integer, primary_key=True,
1872                          doc="primary key column"),
1873                   Column('col2', String, doc="data col"),
1874                   Column('col3', Integer, ForeignKey('t1.col1'),
1875                          doc="foreign key to t1.col1")
1876                   )
1877
1878        class Foo(object):
1879            pass
1880
1881        class Bar(object):
1882            pass
1883
1884        mapper(Foo, t1, properties={
1885            'bars': relationship(Bar,
1886                                 doc="bar relationship",
1887                                 backref=backref('foo', doc='foo relationship')
1888                                 ),
1889            'foober': column_property(t1.c.col3, doc='alternate data col'),
1890            'hoho': synonym("col4", doc="syn of col4")
1891        })
1892        mapper(Bar, t2)
1893        configure_mappers()
1894        eq_(Foo.col1.__doc__, "primary key column")
1895        eq_(Foo.col2.__doc__, "data col")
1896        eq_(Foo.col5.__doc__, None)
1897        eq_(Foo.foober.__doc__, "alternate data col")
1898        eq_(Foo.bars.__doc__, "bar relationship")
1899        eq_(Foo.hoho.__doc__, "syn of col4")
1900        eq_(Bar.col1.__doc__, "primary key column")
1901        eq_(Bar.foo.__doc__, "foo relationship")
1902
1903
1904class ORMLoggingTest(_fixtures.FixtureTest):
1905
1906    def setup(self):
1907        self.buf = logging.handlers.BufferingHandler(100)
1908        for log in [
1909            logging.getLogger('sqlalchemy.orm'),
1910        ]:
1911            log.addHandler(self.buf)
1912
1913    def teardown(self):
1914        for log in [
1915            logging.getLogger('sqlalchemy.orm'),
1916        ]:
1917            log.removeHandler(self.buf)
1918
1919    def _current_messages(self):
1920        return [b.getMessage() for b in self.buf.buffer]
1921
1922    def test_mapper_info_aliased(self):
1923        User, users = self.classes.User, self.tables.users
1924        tb = users.select().alias()
1925        mapper(User, tb)
1926        s = Session()
1927        s.add(User(name='ed'))
1928        s.commit()
1929
1930        for msg in self._current_messages():
1931            assert msg.startswith('(User|%%(%d anon)s) ' % id(tb))
1932
1933
1934class OptionsTest(_fixtures.FixtureTest):
1935
1936    def test_synonym_options(self):
1937        Address, addresses, users, User = (self.classes.Address,
1938                                           self.tables.addresses,
1939                                           self.tables.users,
1940                                           self.classes.User)
1941
1942        mapper(User, users, properties=dict(
1943            addresses=relationship(mapper(Address, addresses), lazy='select',
1944                                   order_by=addresses.c.id),
1945            adlist=synonym('addresses')))
1946
1947        def go():
1948            sess = create_session()
1949            u = (sess.query(User).
1950                 order_by(User.id).
1951                 options(sa.orm.joinedload('adlist')).
1952                 filter_by(name='jack')).one()
1953            eq_(u.adlist,
1954                [self.static.user_address_result[0].addresses[0]])
1955        self.assert_sql_count(testing.db, go, 1)
1956
1957    def test_eager_options(self):
1958        """A lazy relationship can be upgraded to an eager relationship."""
1959
1960        Address, addresses, users, User = (self.classes.Address,
1961                                           self.tables.addresses,
1962                                           self.tables.users,
1963                                           self.classes.User)
1964
1965        mapper(User, users, properties=dict(
1966            addresses=relationship(mapper(Address, addresses),
1967                                   order_by=addresses.c.id)))
1968
1969        sess = create_session()
1970        result = (sess.query(User).
1971                  order_by(User.id).
1972                  options(sa.orm.joinedload('addresses'))).all()
1973
1974        def go():
1975            eq_(result, self.static.user_address_result)
1976        self.sql_count_(0, go)
1977
1978    def test_eager_options_with_limit(self):
1979        Address, addresses, users, User = (self.classes.Address,
1980                                           self.tables.addresses,
1981                                           self.tables.users,
1982                                           self.classes.User)
1983
1984        mapper(User, users, properties=dict(
1985            addresses=relationship(mapper(Address, addresses), lazy='select')))
1986
1987        sess = create_session()
1988        u = (sess.query(User).
1989             options(sa.orm.joinedload('addresses')).
1990             filter_by(id=8)).one()
1991
1992        def go():
1993            eq_(u.id, 8)
1994            eq_(len(u.addresses), 3)
1995        self.sql_count_(0, go)
1996
1997        sess.expunge_all()
1998
1999        u = sess.query(User).filter_by(id=8).one()
2000        eq_(u.id, 8)
2001        eq_(len(u.addresses), 3)
2002
2003    def test_lazy_options_with_limit(self):
2004        Address, addresses, users, User = (self.classes.Address,
2005                                           self.tables.addresses,
2006                                           self.tables.users,
2007                                           self.classes.User)
2008
2009        mapper(User, users, properties=dict(
2010            addresses=relationship(mapper(Address, addresses), lazy='joined')))
2011
2012        sess = create_session()
2013        u = (sess.query(User).
2014             options(sa.orm.lazyload('addresses')).
2015             filter_by(id=8)).one()
2016
2017        def go():
2018            eq_(u.id, 8)
2019            eq_(len(u.addresses), 3)
2020        self.sql_count_(1, go)
2021
2022    def test_eager_degrade(self):
2023        """An eager relationship automatically degrades to a lazy relationship
2024        if eager columns are not available"""
2025
2026        Address, addresses, users, User = (self.classes.Address,
2027                                           self.tables.addresses,
2028                                           self.tables.users,
2029                                           self.classes.User)
2030
2031        mapper(User, users, properties=dict(
2032            addresses=relationship(mapper(Address, addresses),
2033                                   lazy='joined', order_by=addresses.c.id)))
2034
2035        sess = create_session()
2036        # first test straight eager load, 1 statement
2037
2038        def go():
2039            result = sess.query(User).order_by(User.id).all()
2040            eq_(result, self.static.user_address_result)
2041        self.sql_count_(1, go)
2042
2043        sess.expunge_all()
2044
2045        # then select just from users.  run it into instances.
2046        # then assert the data, which will launch 3 more lazy loads
2047        # (previous users in session fell out of scope and were removed from
2048        # session's identity map)
2049        r = users.select().order_by(users.c.id).execute()
2050
2051        def go():
2052            result = list(sess.query(User).instances(r))
2053            eq_(result, self.static.user_address_result)
2054        self.sql_count_(4, go)
2055
2056    def test_eager_degrade_deep(self):
2057        users, Keyword, items, order_items, orders, \
2058            Item, User, Address, keywords, item_keywords, Order, addresses = (
2059                self.tables.users,
2060                self.classes.Keyword,
2061                self.tables.items,
2062                self.tables.order_items,
2063                self.tables.orders,
2064                self.classes.Item,
2065                self.classes.User,
2066                self.classes.Address,
2067                self.tables.keywords,
2068                self.tables.item_keywords,
2069                self.classes.Order,
2070                self.tables.addresses)
2071
2072        # test with a deeper set of eager loads.  when we first load the three
2073        # users, they will have no addresses or orders.  the number of lazy
2074        # loads when traversing the whole thing will be three for the
2075        # addresses and three for the orders.
2076        mapper(Address, addresses)
2077
2078        mapper(Keyword, keywords)
2079
2080        mapper(Item, items, properties=dict(
2081            keywords=relationship(Keyword, secondary=item_keywords,
2082                                  lazy='joined',
2083                                  order_by=item_keywords.c.keyword_id)))
2084
2085        mapper(Order, orders, properties=dict(
2086            items=relationship(Item, secondary=order_items, lazy='joined',
2087                               order_by=order_items.c.item_id)))
2088
2089        mapper(User, users, properties=dict(
2090            addresses=relationship(Address, lazy='joined',
2091                                   order_by=addresses.c.id),
2092            orders=relationship(Order, lazy='joined',
2093                                order_by=orders.c.id)))
2094
2095        sess = create_session()
2096
2097        # first test straight eager load, 1 statement
2098        def go():
2099            result = sess.query(User).order_by(User.id).all()
2100            eq_(result, self.static.user_all_result)
2101        self.assert_sql_count(testing.db, go, 1)
2102
2103        sess.expunge_all()
2104
2105        # then select just from users.  run it into instances.
2106        # then assert the data, which will launch 6 more lazy loads
2107        r = users.select().execute()
2108
2109        def go():
2110            result = list(sess.query(User).instances(r))
2111            eq_(result, self.static.user_all_result)
2112        self.assert_sql_count(testing.db, go, 6)
2113
2114    def test_lazy_options(self):
2115        """An eager relationship can be upgraded to a lazy relationship."""
2116
2117        Address, addresses, users, User = (self.classes.Address,
2118                                           self.tables.addresses,
2119                                           self.tables.users,
2120                                           self.classes.User)
2121
2122        mapper(User, users, properties=dict(
2123            addresses=relationship(mapper(Address, addresses), lazy='joined')
2124        ))
2125
2126        sess = create_session()
2127        result = (sess.query(User).
2128                  order_by(User.id).
2129                  options(sa.orm.lazyload('addresses'))).all()
2130
2131        def go():
2132            eq_(result, self.static.user_address_result)
2133        self.sql_count_(4, go)
2134
2135    def test_option_propagate(self):
2136        users, items, order_items, Order, Item, User, orders = (
2137            self.tables.users,
2138            self.tables.items,
2139            self.tables.order_items,
2140            self.classes.Order,
2141            self.classes.Item,
2142            self.classes.User,
2143            self.tables.orders)
2144
2145        mapper(User, users, properties=dict(
2146            orders=relationship(Order)
2147        ))
2148        mapper(Order, orders, properties=dict(
2149            items=relationship(Item, secondary=order_items)
2150        ))
2151        mapper(Item, items)
2152
2153        sess = create_session()
2154
2155        oalias = aliased(Order)
2156        opt1 = sa.orm.joinedload(User.orders, Order.items)
2157        opt2 = sa.orm.contains_eager(User.orders, Order.items, alias=oalias)
2158        u1 = sess.query(User).join(oalias, User.orders).\
2159            options(opt1, opt2).first()
2160        ustate = attributes.instance_state(u1)
2161        assert opt1 in ustate.load_options
2162        assert opt2 not in ustate.load_options
2163
2164
2165class DeepOptionsTest(_fixtures.FixtureTest):
2166
2167    @classmethod
2168    def setup_mappers(cls):
2169        users, Keyword, items, order_items, Order, Item, User, \
2170            keywords, item_keywords, orders = (
2171                cls.tables.users,
2172                cls.classes.Keyword,
2173                cls.tables.items,
2174                cls.tables.order_items,
2175                cls.classes.Order,
2176                cls.classes.Item,
2177                cls.classes.User,
2178                cls.tables.keywords,
2179                cls.tables.item_keywords,
2180                cls.tables.orders)
2181
2182        mapper(Keyword, keywords)
2183
2184        mapper(Item, items, properties=dict(
2185            keywords=relationship(Keyword, item_keywords,
2186                                  order_by=item_keywords.c.item_id)))
2187
2188        mapper(Order, orders, properties=dict(
2189            items=relationship(Item, order_items,
2190                               order_by=items.c.id)))
2191
2192        mapper(User, users, properties=dict(
2193            orders=relationship(Order, order_by=orders.c.id)))
2194
2195    def test_deep_options_1(self):
2196        User = self.classes.User
2197
2198        sess = create_session()
2199
2200        # joinedload nothing.
2201        u = sess.query(User).order_by(User.id).all()
2202
2203        def go():
2204            u[0].orders[1].items[0].keywords[1]
2205        self.assert_sql_count(testing.db, go, 3)
2206
2207    def test_deep_options_2(self):
2208        """test (joined|subquery)load_all() options"""
2209
2210        User = self.classes.User
2211
2212        sess = create_session()
2213
2214        result = (sess.query(User).
2215                  order_by(User.id).
2216                  options(
2217                      sa.orm.joinedload_all('orders.items.keywords'))).all()
2218
2219        def go():
2220            result[0].orders[1].items[0].keywords[1]
2221        self.sql_count_(0, go)
2222
2223        sess = create_session()
2224
2225        result = (sess.query(User).
2226                  options(
2227                      sa.orm.subqueryload_all('orders.items.keywords'))).all()
2228
2229        def go():
2230            result[0].orders[1].items[0].keywords[1]
2231        self.sql_count_(0, go)
2232
2233    def test_deep_options_3(self):
2234        User = self.classes.User
2235
2236        sess = create_session()
2237
2238        # same thing, with separate options calls
2239        q2 = (sess.query(User).
2240              order_by(User.id).
2241              options(sa.orm.joinedload('orders')).
2242              options(sa.orm.joinedload('orders.items')).
2243              options(sa.orm.joinedload('orders.items.keywords')))
2244        u = q2.all()
2245
2246        def go():
2247            u[0].orders[1].items[0].keywords[1]
2248        self.sql_count_(0, go)
2249
2250    def test_deep_options_4(self):
2251        Item, User, Order = (self.classes.Item,
2252                             self.classes.User,
2253                             self.classes.Order)
2254
2255        sess = create_session()
2256
2257        assert_raises_message(
2258            sa.exc.ArgumentError,
2259            "Can't find property 'items' on any entity "
2260            "specified in this Query.",
2261            sess.query(User).options, sa.orm.joinedload(Order.items))
2262
2263        # joinedload "keywords" on items.  it will lazy load "orders", then
2264        # lazy load the "items" on the order, but on "items" it will eager
2265        # load the "keywords"
2266        q3 = sess.query(User).order_by(User.id).options(
2267            sa.orm.joinedload('orders.items.keywords'))
2268        u = q3.all()
2269
2270        def go():
2271            u[0].orders[1].items[0].keywords[1]
2272        self.sql_count_(2, go)
2273
2274        sess = create_session()
2275        q3 = sess.query(User).order_by(User.id).options(
2276            sa.orm.joinedload(User.orders, Order.items, Item.keywords))
2277        u = q3.all()
2278
2279        def go():
2280            u[0].orders[1].items[0].keywords[1]
2281        self.sql_count_(2, go)
2282
2283
2284class ComparatorFactoryTest(_fixtures.FixtureTest, AssertsCompiledSQL):
2285
2286    def test_kwarg_accepted(self):
2287        users, Address = self.tables.users, self.classes.Address
2288
2289        class DummyComposite(object):
2290
2291            def __init__(self, x, y):
2292                pass
2293
2294        from sqlalchemy.orm.interfaces import PropComparator
2295
2296        class MyFactory(PropComparator):
2297            pass
2298
2299        for args in (
2300            (column_property, users.c.name),
2301            (deferred, users.c.name),
2302            (synonym, 'name'),
2303            (composite, DummyComposite, users.c.id, users.c.name),
2304            (relationship, Address),
2305            (backref, 'address'),
2306            (comparable_property, ),
2307            (dynamic_loader, Address)
2308        ):
2309            fn = args[0]
2310            args = args[1:]
2311            fn(comparator_factory=MyFactory, *args)
2312
2313    def test_column(self):
2314        User, users = self.classes.User, self.tables.users
2315
2316        from sqlalchemy.orm.properties import ColumnProperty
2317
2318        class MyFactory(ColumnProperty.Comparator):
2319            __hash__ = None
2320
2321            def __eq__(self, other):
2322                return func.foobar(self.__clause_element__()) == \
2323                    func.foobar(other)
2324        mapper(
2325            User, users,
2326            properties={
2327                'name': column_property(
2328                    users.c.name, comparator_factory=MyFactory)})
2329        self.assert_compile(
2330            User.name == 'ed',
2331            "foobar(users.name) = foobar(:foobar_1)",
2332            dialect=default.DefaultDialect()
2333        )
2334        self.assert_compile(
2335            aliased(User).name == 'ed',
2336            "foobar(users_1.name) = foobar(:foobar_1)",
2337            dialect=default.DefaultDialect())
2338
2339    def test_synonym(self):
2340        users, User = self.tables.users, self.classes.User
2341
2342        from sqlalchemy.orm.properties import ColumnProperty
2343
2344        class MyFactory(ColumnProperty.Comparator):
2345            __hash__ = None
2346
2347            def __eq__(self, other):
2348                return func.foobar(self.__clause_element__()) ==\
2349                    func.foobar(other)
2350
2351        mapper(User, users, properties={
2352            'name': synonym('_name', map_column=True,
2353                            comparator_factory=MyFactory)
2354        })
2355        self.assert_compile(
2356            User.name == 'ed',
2357            "foobar(users.name) = foobar(:foobar_1)",
2358            dialect=default.DefaultDialect())
2359
2360        self.assert_compile(
2361            aliased(User).name == 'ed',
2362            "foobar(users_1.name) = foobar(:foobar_1)",
2363            dialect=default.DefaultDialect())
2364
2365    def test_relationship(self):
2366        users, Address, addresses, User = (self.tables.users,
2367                                           self.classes.Address,
2368                                           self.tables.addresses,
2369                                           self.classes.User)
2370
2371        from sqlalchemy.orm.properties import RelationshipProperty
2372
2373        # NOTE: this API changed in 0.8, previously __clause_element__()
2374        # gave the parent selecatable, now it gives the
2375        # primaryjoin/secondaryjoin
2376        class MyFactory(RelationshipProperty.Comparator):
2377            __hash__ = None
2378
2379            def __eq__(self, other):
2380                return func.foobar(self._source_selectable().c.user_id) == \
2381                    func.foobar(other.id)
2382
2383        class MyFactory2(RelationshipProperty.Comparator):
2384            __hash__ = None
2385
2386            def __eq__(self, other):
2387                return func.foobar(self._source_selectable().c.id) == \
2388                    func.foobar(other.user_id)
2389
2390        mapper(User, users)
2391        mapper(Address, addresses, properties={
2392            'user': relationship(
2393                User, comparator_factory=MyFactory,
2394                backref=backref("addresses", comparator_factory=MyFactory2)
2395            )
2396        }
2397        )
2398
2399        # these are kind of nonsensical tests.
2400        self.assert_compile(Address.user == User(id=5),
2401                            "foobar(addresses.user_id) = foobar(:foobar_1)",
2402                            dialect=default.DefaultDialect())
2403        self.assert_compile(User.addresses == Address(id=5, user_id=7),
2404                            "foobar(users.id) = foobar(:foobar_1)",
2405                            dialect=default.DefaultDialect())
2406
2407        self.assert_compile(
2408            aliased(Address).user == User(id=5),
2409            "foobar(addresses_1.user_id) = foobar(:foobar_1)",
2410            dialect=default.DefaultDialect())
2411
2412        self.assert_compile(
2413            aliased(User).addresses == Address(id=5, user_id=7),
2414            "foobar(users_1.id) = foobar(:foobar_1)",
2415            dialect=default.DefaultDialect())
2416
2417
2418class SecondaryOptionsTest(fixtures.MappedTest):
2419
2420    """test that the contains_eager() option doesn't bleed
2421    into a secondary load."""
2422
2423    run_inserts = 'once'
2424
2425    run_deletes = None
2426
2427    @classmethod
2428    def define_tables(cls, metadata):
2429        Table("base", metadata,
2430              Column('id', Integer, primary_key=True),
2431              Column('type', String(50), nullable=False)
2432              )
2433        Table("child1", metadata,
2434              Column('id', Integer, ForeignKey('base.id'), primary_key=True),
2435              Column(
2436                  'child2id', Integer, ForeignKey('child2.id'), nullable=False)
2437              )
2438        Table("child2", metadata,
2439              Column('id', Integer, ForeignKey('base.id'), primary_key=True),
2440              )
2441        Table('related', metadata,
2442              Column('id', Integer, ForeignKey('base.id'), primary_key=True),
2443              )
2444
2445    @classmethod
2446    def setup_mappers(cls):
2447        child1, child2, base, related = (cls.tables.child1,
2448                                         cls.tables.child2,
2449                                         cls.tables.base,
2450                                         cls.tables.related)
2451
2452        class Base(cls.Comparable):
2453            pass
2454
2455        class Child1(Base):
2456            pass
2457
2458        class Child2(Base):
2459            pass
2460
2461        class Related(cls.Comparable):
2462            pass
2463        mapper(Base, base, polymorphic_on=base.c.type, properties={
2464            'related': relationship(Related, uselist=False)
2465        })
2466        mapper(Child1, child1, inherits=Base,
2467               polymorphic_identity='child1',
2468               properties={
2469                   'child2': relationship(
2470                       Child2,
2471                       primaryjoin=child1.c.child2id == base.c.id,
2472                       foreign_keys=child1.c.child2id)
2473               })
2474        mapper(Child2, child2, inherits=Base, polymorphic_identity='child2')
2475        mapper(Related, related)
2476
2477    @classmethod
2478    def insert_data(cls):
2479        child1, child2, base, related = (cls.tables.child1,
2480                                         cls.tables.child2,
2481                                         cls.tables.base,
2482                                         cls.tables.related)
2483
2484        base.insert().execute([
2485            {'id': 1, 'type': 'child1'},
2486            {'id': 2, 'type': 'child1'},
2487            {'id': 3, 'type': 'child1'},
2488            {'id': 4, 'type': 'child2'},
2489            {'id': 5, 'type': 'child2'},
2490            {'id': 6, 'type': 'child2'},
2491        ])
2492        child2.insert().execute([
2493            {'id': 4},
2494            {'id': 5},
2495            {'id': 6},
2496        ])
2497        child1.insert().execute([
2498            {'id': 1, 'child2id': 4},
2499            {'id': 2, 'child2id': 5},
2500            {'id': 3, 'child2id': 6},
2501        ])
2502        related.insert().execute([
2503            {'id': 1},
2504            {'id': 2},
2505            {'id': 3},
2506            {'id': 4},
2507            {'id': 5},
2508            {'id': 6},
2509        ])
2510
2511    def test_contains_eager(self):
2512        Child1, Related = self.classes.Child1, self.classes.Related
2513
2514        sess = create_session()
2515
2516        child1s = sess.query(Child1).\
2517            join(Child1.related).\
2518            options(sa.orm.contains_eager(Child1.related)).\
2519            order_by(Child1.id)
2520
2521        def go():
2522            eq_(
2523                child1s.all(),
2524                [
2525                    Child1(id=1, related=Related(id=1)),
2526                    Child1(id=2, related=Related(id=2)),
2527                    Child1(id=3, related=Related(id=3))
2528                ]
2529            )
2530        self.assert_sql_count(testing.db, go, 1)
2531
2532        c1 = child1s[0]
2533
2534        self.assert_sql_execution(
2535            testing.db,
2536            lambda: c1.child2,
2537            CompiledSQL(
2538                "SELECT child2.id AS child2_id, base.id AS base_id, "
2539                "base.type AS base_type "
2540                "FROM base JOIN child2 ON base.id = child2.id "
2541                "WHERE base.id = :param_1",
2542                {'param_1': 4}
2543            )
2544        )
2545
2546    def test_joinedload_on_other(self):
2547        Child1, Related = self.classes.Child1, self.classes.Related
2548
2549        sess = create_session()
2550
2551        child1s = sess.query(Child1).join(Child1.related).options(
2552            sa.orm.joinedload(Child1.related)).order_by(Child1.id)
2553
2554        def go():
2555            eq_(
2556                child1s.all(),
2557                [Child1(id=1, related=Related(id=1)),
2558                 Child1(id=2, related=Related(id=2)),
2559                 Child1(id=3, related=Related(id=3))]
2560            )
2561        self.assert_sql_count(testing.db, go, 1)
2562
2563        c1 = child1s[0]
2564
2565        self.assert_sql_execution(
2566            testing.db,
2567            lambda: c1.child2,
2568            CompiledSQL(
2569                "SELECT child2.id AS child2_id, base.id AS base_id, "
2570                "base.type AS base_type "
2571                "FROM base JOIN child2 ON base.id = child2.id "
2572                "WHERE base.id = :param_1",
2573
2574                {'param_1': 4}
2575            )
2576        )
2577
2578    def test_joinedload_on_same(self):
2579        Child1, Child2, Related = (self.classes.Child1,
2580                                   self.classes.Child2,
2581                                   self.classes.Related)
2582
2583        sess = create_session()
2584
2585        child1s = sess.query(Child1).join(Child1.related).options(
2586            sa.orm.joinedload(Child1.child2, Child2.related)
2587        ).order_by(Child1.id)
2588
2589        def go():
2590            eq_(
2591                child1s.all(),
2592                [Child1(id=1, related=Related(id=1)),
2593                 Child1(id=2, related=Related(id=2)),
2594                 Child1(id=3, related=Related(id=3))]
2595            )
2596        self.assert_sql_count(testing.db, go, 4)
2597
2598        c1 = child1s[0]
2599
2600        # this *does* joinedload
2601        self.assert_sql_execution(
2602            testing.db,
2603            lambda: c1.child2,
2604            CompiledSQL(
2605                "SELECT child2.id AS child2_id, base.id AS base_id, "
2606                "base.type AS base_type, "
2607                "related_1.id AS related_1_id FROM base JOIN child2 "
2608                "ON base.id = child2.id "
2609                "LEFT OUTER JOIN related AS related_1 "
2610                "ON base.id = related_1.id WHERE base.id = :param_1",
2611                {'param_1': 4}
2612            )
2613        )
2614
2615
2616class DeferredPopulationTest(fixtures.MappedTest):
2617
2618    @classmethod
2619    def define_tables(cls, metadata):
2620        Table("thing", metadata,
2621              Column(
2622                  "id", Integer, primary_key=True,
2623                  test_needs_autoincrement=True),
2624              Column("name", String(20)))
2625
2626        Table("human", metadata,
2627              Column(
2628                  "id", Integer, primary_key=True,
2629                  test_needs_autoincrement=True),
2630              Column("thing_id", Integer, ForeignKey("thing.id")),
2631              Column("name", String(20)))
2632
2633    @classmethod
2634    def setup_mappers(cls):
2635        thing, human = cls.tables.thing, cls.tables.human
2636
2637        class Human(cls.Basic):
2638            pass
2639
2640        class Thing(cls.Basic):
2641            pass
2642
2643        mapper(Human, human, properties={"thing": relationship(Thing)})
2644        mapper(Thing, thing, properties={"name": deferred(thing.c.name)})
2645
2646    @classmethod
2647    def insert_data(cls):
2648        thing, human = cls.tables.thing, cls.tables.human
2649
2650        thing.insert().execute([
2651            {"id": 1, "name": "Chair"},
2652        ])
2653
2654        human.insert().execute([
2655            {"id": 1, "thing_id": 1, "name": "Clark Kent"},
2656        ])
2657
2658    def _test(self, thing):
2659        assert "name" in attributes.instance_state(thing).dict
2660
2661    def test_no_previous_query(self):
2662        Thing = self.classes.Thing
2663
2664        session = create_session()
2665        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
2666        self._test(thing)
2667
2668    def test_query_twice_with_clear(self):
2669        Thing = self.classes.Thing
2670
2671        session = create_session()
2672        result = session.query(Thing).first()  # noqa
2673        session.expunge_all()
2674        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
2675        self._test(thing)
2676
2677    def test_query_twice_no_clear(self):
2678        Thing = self.classes.Thing
2679
2680        session = create_session()
2681        result = session.query(Thing).first()    # noqa
2682        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
2683        self._test(thing)
2684
2685    def test_joinedload_with_clear(self):
2686        Thing, Human = self.classes.Thing, self.classes.Human
2687
2688        session = create_session()
2689        human = session.query(Human).options(    # noqa
2690            sa.orm.joinedload("thing")).first()
2691        session.expunge_all()
2692        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
2693        self._test(thing)
2694
2695    def test_joinedload_no_clear(self):
2696        Thing, Human = self.classes.Thing, self.classes.Human
2697
2698        session = create_session()
2699        human = session.query(Human).options(    # noqa
2700            sa.orm.joinedload("thing")).first()
2701        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
2702        self._test(thing)
2703
2704    def test_join_with_clear(self):
2705        Thing, Human = self.classes.Thing, self.classes.Human
2706
2707        session = create_session()
2708        result = session.query(Human).add_entity(  # noqa
2709            Thing).join("thing").first()
2710        session.expunge_all()
2711        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
2712        self._test(thing)
2713
2714    def test_join_no_clear(self):
2715        Thing, Human = self.classes.Thing, self.classes.Human
2716
2717        session = create_session()
2718        result = session.query(Human).add_entity(  # noqa
2719            Thing).join("thing").first()
2720        thing = session.query(Thing).options(sa.orm.undefer("name")).first()
2721        self._test(thing)
2722
2723
2724class NoLoadTest(_fixtures.FixtureTest):
2725    run_inserts = 'once'
2726    run_deletes = None
2727
2728    def test_o2m_noload(self):
2729
2730        Address, addresses, users, User = (
2731            self.classes.Address,
2732            self.tables.addresses,
2733            self.tables.users,
2734            self.classes.User)
2735
2736        m = mapper(User, users, properties=dict(
2737            addresses=relationship(mapper(Address, addresses), lazy='noload')
2738        ))
2739        q = create_session().query(m)
2740        result = [None]
2741
2742        def go():
2743            x = q.filter(User.id == 7).all()
2744            x[0].addresses
2745            result[0] = x
2746        self.assert_sql_count(testing.db, go, 1)
2747
2748        self.assert_result(
2749            result[0], User,
2750            {'id': 7, 'addresses': (Address, [])},
2751        )
2752
2753    def test_upgrade_o2m_noload_lazyload_option(self):
2754        Address, addresses, users, User = (
2755            self.classes.Address,
2756            self.tables.addresses,
2757            self.tables.users,
2758            self.classes.User)
2759
2760        m = mapper(User, users, properties=dict(
2761            addresses=relationship(mapper(Address, addresses), lazy='noload')
2762        ))
2763        q = create_session().query(m).options(sa.orm.lazyload('addresses'))
2764        result = [None]
2765
2766        def go():
2767            x = q.filter(User.id == 7).all()
2768            x[0].addresses
2769            result[0] = x
2770        self.sql_count_(2, go)
2771
2772        self.assert_result(
2773            result[0], User,
2774            {'id': 7, 'addresses': (Address, [{'id': 1}])},
2775        )
2776
2777    def test_m2o_noload_option(self):
2778        Address, addresses, users, User = (
2779            self.classes.Address,
2780            self.tables.addresses,
2781            self.tables.users,
2782            self.classes.User)
2783        mapper(Address, addresses, properties={
2784            'user': relationship(User)
2785        })
2786        mapper(User, users)
2787        s = Session()
2788        a1 = s.query(Address).filter_by(id=1).options(
2789            sa.orm.noload('user')).first()
2790
2791        def go():
2792            eq_(a1.user, None)
2793        self.sql_count_(0, go)
2794
2795
2796class RaiseLoadTest(_fixtures.FixtureTest):
2797    run_inserts = 'once'
2798    run_deletes = None
2799
2800    def test_o2m_raiseload_mapper(self):
2801        Address, addresses, users, User = (
2802            self.classes.Address,
2803            self.tables.addresses,
2804            self.tables.users,
2805            self.classes.User)
2806
2807        mapper(Address, addresses)
2808        mapper(User, users, properties=dict(
2809            addresses=relationship(Address, lazy='raise')
2810        ))
2811        q = create_session().query(User)
2812        result = [None]
2813
2814        def go():
2815            x = q.filter(User.id == 7).all()
2816            assert_raises_message(
2817                sa.exc.InvalidRequestError,
2818                "'User.addresses' is not available due to lazy='raise'",
2819                lambda: x[0].addresses)
2820            result[0] = x
2821        self.assert_sql_count(testing.db, go, 1)
2822
2823        self.assert_result(
2824            result[0], User,
2825            {'id': 7},
2826        )
2827
2828    def test_o2m_raiseload_option(self):
2829        Address, addresses, users, User = (
2830            self.classes.Address,
2831            self.tables.addresses,
2832            self.tables.users,
2833            self.classes.User)
2834
2835        mapper(Address, addresses)
2836        mapper(User, users, properties=dict(
2837            addresses=relationship(Address)
2838        ))
2839        q = create_session().query(User)
2840        result = [None]
2841
2842        def go():
2843            x = q.options(
2844                sa.orm.raiseload(User.addresses)).filter(User.id == 7).all()
2845            assert_raises_message(
2846                sa.exc.InvalidRequestError,
2847                "'User.addresses' is not available due to lazy='raise'",
2848                lambda: x[0].addresses)
2849            result[0] = x
2850        self.assert_sql_count(testing.db, go, 1)
2851
2852        self.assert_result(
2853            result[0], User,
2854            {'id': 7},
2855        )
2856
2857    def test_o2m_raiseload_lazyload_option(self):
2858        Address, addresses, users, User = (
2859            self.classes.Address,
2860            self.tables.addresses,
2861            self.tables.users,
2862            self.classes.User)
2863
2864        mapper(Address, addresses)
2865        mapper(User, users, properties=dict(
2866            addresses=relationship(Address, lazy='raise')
2867        ))
2868        q = create_session().query(User).options(sa.orm.lazyload('addresses'))
2869        result = [None]
2870
2871        def go():
2872            x = q.filter(User.id == 7).all()
2873            x[0].addresses
2874            result[0] = x
2875        self.sql_count_(2, go)
2876
2877        self.assert_result(
2878            result[0], User,
2879            {'id': 7, 'addresses': (Address, [{'id': 1}])},
2880        )
2881
2882    def test_m2o_raiseload_option(self):
2883        Address, addresses, users, User = (
2884            self.classes.Address,
2885            self.tables.addresses,
2886            self.tables.users,
2887            self.classes.User)
2888        mapper(Address, addresses, properties={
2889            'user': relationship(User)
2890        })
2891        mapper(User, users)
2892        s = Session()
2893        a1 = s.query(Address).filter_by(id=1).options(
2894            sa.orm.raiseload('user')).first()
2895
2896        def go():
2897            assert_raises_message(
2898                sa.exc.InvalidRequestError,
2899                "'Address.user' is not available due to lazy='raise'",
2900                lambda: a1.user)
2901
2902        self.sql_count_(0, go)
2903
2904    def test_m2o_raise_on_sql_option(self):
2905        Address, addresses, users, User = (
2906            self.classes.Address,
2907            self.tables.addresses,
2908            self.tables.users,
2909            self.classes.User)
2910        mapper(Address, addresses, properties={
2911            'user': relationship(User)
2912        })
2913        mapper(User, users)
2914        s = Session()
2915        a1 = s.query(Address).filter_by(id=1).options(
2916            sa.orm.raiseload('user', sql_only=True)).first()
2917
2918        def go():
2919            assert_raises_message(
2920                sa.exc.InvalidRequestError,
2921                "'Address.user' is not available due to lazy='raise_on_sql'",
2922                lambda: a1.user)
2923
2924        self.sql_count_(0, go)
2925
2926        s.close()
2927
2928        u1 = s.query(User).first()
2929        a1 = s.query(Address).filter_by(id=1).options(
2930            sa.orm.raiseload('user', sql_only=True)).first()
2931        assert 'user' not in a1.__dict__
2932        is_(a1.user, u1)
2933
2934    def test_m2o_non_use_get_raise_on_sql_option(self):
2935        Address, addresses, users, User = (
2936            self.classes.Address,
2937            self.tables.addresses,
2938            self.tables.users,
2939            self.classes.User)
2940        mapper(Address, addresses, properties={
2941            'user': relationship(
2942                User,
2943                primaryjoin=sa.and_(
2944                    addresses.c.user_id == users.c.id,
2945                    users.c.name != None  # noqa
2946                )
2947            )
2948        })
2949        mapper(User, users)
2950        s = Session()
2951        u1 = s.query(User).first()
2952        a1 = s.query(Address).filter_by(id=1).options(
2953            sa.orm.raiseload('user', sql_only=True)).first()
2954
2955        def go():
2956            assert_raises_message(
2957                sa.exc.InvalidRequestError,
2958                "'Address.user' is not available due to lazy='raise_on_sql'",
2959                lambda: a1.user)
2960
2961
2962class RequirementsTest(fixtures.MappedTest):
2963
2964    """Tests the contract for user classes."""
2965
2966    @classmethod
2967    def define_tables(cls, metadata):
2968        Table('ht1', metadata,
2969              Column(
2970                  'id', Integer, primary_key=True,
2971                  test_needs_autoincrement=True),
2972              Column('value', String(10)))
2973        Table('ht2', metadata,
2974              Column(
2975                  'id', Integer, primary_key=True,
2976                  test_needs_autoincrement=True),
2977              Column('ht1_id', Integer, ForeignKey('ht1.id')),
2978              Column('value', String(10)))
2979        Table('ht3', metadata,
2980              Column(
2981                  'id', Integer, primary_key=True,
2982                  test_needs_autoincrement=True),
2983              Column('value', String(10)))
2984        Table('ht4', metadata,
2985              Column('ht1_id', Integer, ForeignKey('ht1.id'),
2986                     primary_key=True),
2987              Column('ht3_id', Integer, ForeignKey('ht3.id'),
2988                     primary_key=True))
2989        Table('ht5', metadata,
2990              Column('ht1_id', Integer, ForeignKey('ht1.id'),
2991                     primary_key=True))
2992        Table('ht6', metadata,
2993              Column('ht1a_id', Integer, ForeignKey('ht1.id'),
2994                     primary_key=True),
2995              Column('ht1b_id', Integer, ForeignKey('ht1.id'),
2996                     primary_key=True),
2997              Column('value', String(10)))
2998
2999    if util.py2k:
3000        def test_baseclass(self):
3001            ht1 = self.tables.ht1
3002
3003            class OldStyle:
3004                pass
3005
3006            assert_raises(sa.exc.ArgumentError, mapper, OldStyle, ht1)
3007
3008            assert_raises(sa.exc.ArgumentError, mapper, 123)
3009
3010            class NoWeakrefSupport(str):
3011                pass
3012
3013            # TODO: is weakref support detectable without an instance?
3014            # self.assertRaises(
3015            #  sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
3016
3017    class _ValueBase(object):
3018
3019        def __init__(self, value='abc', id=None):
3020            self.id = id
3021            self.value = value
3022
3023        def __bool__(self):
3024            return False
3025
3026        def __hash__(self):
3027            return hash(self.value)
3028
3029        def __eq__(self, other):
3030            if isinstance(other, type(self)):
3031                return self.value == other.value
3032            return False
3033
3034    def test_comparison_overrides(self):
3035        """Simple tests to ensure users can supply comparison __methods__.
3036
3037        The suite-level test --options are better suited to detect
3038        problems- they add selected __methods__ across the board on all
3039        ORM tests.  This test simply shoves a variety of operations
3040        through the ORM to catch basic regressions early in a standard
3041        test run.
3042        """
3043
3044        ht6, ht5, ht4, ht3, ht2, ht1 = (self.tables.ht6,
3045                                        self.tables.ht5,
3046                                        self.tables.ht4,
3047                                        self.tables.ht3,
3048                                        self.tables.ht2,
3049                                        self.tables.ht1)
3050
3051        class H1(self._ValueBase):
3052            pass
3053
3054        class H2(self._ValueBase):
3055            pass
3056
3057        class H3(self._ValueBase):
3058            pass
3059
3060        class H6(self._ValueBase):
3061            pass
3062
3063        mapper(H1, ht1, properties={
3064            'h2s': relationship(H2, backref='h1'),
3065            'h3s': relationship(H3, secondary=ht4, backref='h1s'),
3066            'h1s': relationship(H1, secondary=ht5, backref='parent_h1'),
3067            't6a': relationship(H6, backref='h1a',
3068                                primaryjoin=ht1.c.id == ht6.c.ht1a_id),
3069            't6b': relationship(H6, backref='h1b',
3070                                primaryjoin=ht1.c.id == ht6.c.ht1b_id),
3071        })
3072        mapper(H2, ht2)
3073        mapper(H3, ht3)
3074        mapper(H6, ht6)
3075
3076        s = create_session()
3077        s.add_all([
3078            H1('abc'),
3079            H1('def'),
3080        ])
3081        h1 = H1('ghi')
3082        s.add(h1)
3083        h1.h2s.append(H2('abc'))
3084        h1.h3s.extend([H3(), H3()])
3085        h1.h1s.append(H1())
3086
3087        s.flush()
3088        eq_(select([func.count('*')]).select_from(ht1).scalar(), 4)
3089
3090        h6 = H6()
3091        h6.h1a = h1
3092        h6.h1b = h1
3093
3094        h6 = H6()
3095        h6.h1a = h1
3096        h6.h1b = x = H1()
3097        assert x in s
3098
3099        h6.h1b.h2s.append(H2('def'))
3100
3101        s.flush()
3102
3103        h1.h2s.extend([H2('abc'), H2('def')])
3104        s.flush()
3105
3106        h1s = s.query(H1).options(sa.orm.joinedload('h2s')).all()
3107        eq_(len(h1s), 5)
3108
3109        self.assert_unordered_result(h1s, H1,
3110                                     {'h2s': []},
3111                                     {'h2s': []},
3112                                     {'h2s': (H2, [{'value': 'abc'},
3113                                                   {'value': 'def'},
3114                                                   {'value': 'abc'}])},
3115                                     {'h2s': []},
3116                                     {'h2s': (H2, [{'value': 'def'}])})
3117
3118        h1s = s.query(H1).options(sa.orm.joinedload('h3s')).all()
3119
3120        eq_(len(h1s), 5)
3121        h1s = s.query(H1).options(sa.orm.joinedload_all('t6a.h1b'),
3122                                  sa.orm.joinedload('h2s'),
3123                                  sa.orm.joinedload_all('h3s.h1s')).all()
3124        eq_(len(h1s), 5)
3125
3126    def test_composite_results(self):
3127        ht2, ht1 = (self.tables.ht2,
3128                    self.tables.ht1)
3129
3130        class H1(self._ValueBase):
3131
3132            def __init__(self, value, id, h2s):
3133                self.value = value
3134                self.id = id
3135                self.h2s = h2s
3136
3137        class H2(self._ValueBase):
3138
3139            def __init__(self, value, id):
3140                self.value = value
3141                self.id = id
3142
3143        mapper(H1, ht1, properties={
3144            'h2s': relationship(H2, backref='h1'),
3145        })
3146        mapper(H2, ht2)
3147        s = Session()
3148        s.add_all([
3149            H1('abc', 1, h2s=[
3150                H2('abc', id=1),
3151                H2('def', id=2),
3152                H2('def', id=3),
3153            ]),
3154            H1('def', 2, h2s=[
3155                H2('abc', id=4),
3156                H2('abc', id=5),
3157                H2('def', id=6),
3158            ]),
3159        ])
3160        s.commit()
3161        eq_(
3162            [(h1.value, h1.id, h2.value, h2.id)
3163             for h1, h2 in
3164             s.query(H1, H2).join(H1.h2s).order_by(H1.id, H2.id)],
3165            [
3166                ('abc', 1, 'abc', 1),
3167                ('abc', 1, 'def', 2),
3168                ('abc', 1, 'def', 3),
3169                ('def', 2, 'abc', 4),
3170                ('def', 2, 'abc', 5),
3171                ('def', 2, 'def', 6),
3172            ]
3173        )
3174
3175    def test_nonzero_len_recursion(self):
3176        ht1 = self.tables.ht1
3177
3178        class H1(object):
3179
3180            def __len__(self):
3181                return len(self.get_value())
3182
3183            def get_value(self):
3184                self.value = "foobar"
3185                return self.value
3186
3187        class H2(object):
3188
3189            def __bool__(self):
3190                return bool(self.get_value())
3191
3192            def get_value(self):
3193                self.value = "foobar"
3194                return self.value
3195
3196        mapper(H1, ht1)
3197        mapper(H2, ht1)
3198
3199        h1 = H1()
3200        h1.value = "Asdf"
3201        h1.value = "asdf asdf"  # ding
3202
3203        h2 = H2()
3204        h2.value = "Asdf"
3205        h2.value = "asdf asdf"  # ding
3206
3207
3208class IsUserlandTest(fixtures.MappedTest):
3209
3210    @classmethod
3211    def define_tables(cls, metadata):
3212        Table('foo', metadata,
3213              Column('id', Integer, primary_key=True),
3214              Column('someprop', Integer)
3215              )
3216
3217    def _test(self, value, instancelevel=None):
3218        class Foo(object):
3219            someprop = value
3220
3221        m = mapper(Foo, self.tables.foo)
3222        eq_(Foo.someprop, value)
3223        f1 = Foo()
3224        if instancelevel is not None:
3225            eq_(f1.someprop, instancelevel)
3226        else:
3227            eq_(f1.someprop, value)
3228        assert self.tables.foo.c.someprop not in m._columntoproperty
3229
3230    def _test_not(self, value):
3231        class Foo(object):
3232            someprop = value
3233
3234        m = mapper(Foo, self.tables.foo)
3235        is_(Foo.someprop.property.columns[0], self.tables.foo.c.someprop)
3236        assert self.tables.foo.c.someprop in m._columntoproperty
3237
3238    def test_string(self):
3239        self._test("someprop")
3240
3241    def test_unicode(self):
3242        self._test("someprop")
3243
3244    def test_int(self):
3245        self._test(5)
3246
3247    def test_dict(self):
3248        self._test({"bar": "bat"})
3249
3250    def test_set(self):
3251        self._test(set([6]))
3252
3253    def test_column(self):
3254        self._test_not(self.tables.foo.c.someprop)
3255
3256    def test_relationship(self):
3257        self._test_not(relationship("bar"))
3258
3259    def test_descriptor(self):
3260        def somefunc(self):
3261            return "hi"
3262        self._test(property(somefunc), "hi")
3263
3264
3265class MagicNamesTest(fixtures.MappedTest):
3266
3267    @classmethod
3268    def define_tables(cls, metadata):
3269        Table('cartographers', metadata,
3270              Column('id', Integer, primary_key=True,
3271                     test_needs_autoincrement=True),
3272              Column('name', String(50)),
3273              Column('alias', String(50)),
3274              Column('quip', String(100)))
3275        Table('maps', metadata,
3276              Column('id', Integer, primary_key=True,
3277                     test_needs_autoincrement=True),
3278              Column('cart_id', Integer,
3279                     ForeignKey('cartographers.id')),
3280              Column('state', String(2)),
3281              Column('data', sa.Text))
3282
3283    @classmethod
3284    def setup_classes(cls):
3285        class Cartographer(cls.Basic):
3286            pass
3287
3288        class Map(cls.Basic):
3289            pass
3290
3291    def test_mappish(self):
3292        maps, Cartographer, cartographers, Map = (self.tables.maps,
3293                                                  self.classes.Cartographer,
3294                                                  self.tables.cartographers,
3295                                                  self.classes.Map)
3296
3297        mapper(Cartographer, cartographers, properties=dict(
3298            query=cartographers.c.quip))
3299        mapper(Map, maps, properties=dict(
3300            mapper=relationship(Cartographer, backref='maps')))
3301
3302        c = Cartographer(name='Lenny', alias='The Dude',
3303                         query='Where be dragons?')
3304        Map(state='AK', mapper=c)
3305
3306        sess = create_session()
3307        sess.add(c)
3308        sess.flush()
3309        sess.expunge_all()
3310
3311        for C, M in ((Cartographer, Map),
3312                     (sa.orm.aliased(Cartographer), sa.orm.aliased(Map))):
3313            c1 = (sess.query(C).
3314                  filter(C.alias == 'The Dude').
3315                  filter(C.query == 'Where be dragons?')).one()
3316            sess.query(M).filter(M.mapper == c1).one()
3317
3318    def test_direct_stateish(self):
3319        for reserved in (sa.orm.instrumentation.ClassManager.STATE_ATTR,
3320                         sa.orm.instrumentation.ClassManager.MANAGER_ATTR):
3321            t = Table('t', sa.MetaData(),
3322                      Column('id', Integer, primary_key=True,
3323                             test_needs_autoincrement=True),
3324                      Column(reserved, Integer))
3325
3326            class T(object):
3327                pass
3328            assert_raises_message(
3329                KeyError,
3330                ('%r: requested attribute name conflicts with '
3331                 'instrumentation attribute of the same name.' % reserved),
3332                mapper, T, t)
3333
3334    def test_indirect_stateish(self):
3335        maps = self.tables.maps
3336
3337        for reserved in (sa.orm.instrumentation.ClassManager.STATE_ATTR,
3338                         sa.orm.instrumentation.ClassManager.MANAGER_ATTR):
3339            class M(object):
3340                pass
3341
3342            assert_raises_message(
3343                KeyError,
3344                ('requested attribute name conflicts with '
3345                 'instrumentation attribute of the same name'),
3346                mapper, M, maps, properties={
3347                    reserved: maps.c.state})
3348