1import sqlalchemy as sa
2from sqlalchemy import CheckConstraint
3from sqlalchemy import event
4from sqlalchemy import exc
5from sqlalchemy import ForeignKey
6from sqlalchemy import ForeignKeyConstraint
7from sqlalchemy import Index
8from sqlalchemy import inspect
9from sqlalchemy import Integer
10from sqlalchemy import MetaData
11from sqlalchemy import String
12from sqlalchemy import testing
13from sqlalchemy import UniqueConstraint
14from sqlalchemy import util
15from sqlalchemy.ext import declarative as decl
16from sqlalchemy.ext.declarative import DeclarativeMeta
17from sqlalchemy.ext.declarative import declared_attr
18from sqlalchemy.ext.declarative import synonym_for
19from sqlalchemy.ext.declarative.base import _DeferredMapperConfig
20from sqlalchemy.ext.hybrid import hybrid_property
21from sqlalchemy.orm import backref
22from sqlalchemy.orm import class_mapper
23from sqlalchemy.orm import clear_mappers
24from sqlalchemy.orm import close_all_sessions
25from sqlalchemy.orm import column_property
26from sqlalchemy.orm import composite
27from sqlalchemy.orm import configure_mappers
28from sqlalchemy.orm import create_session
29from sqlalchemy.orm import deferred
30from sqlalchemy.orm import exc as orm_exc
31from sqlalchemy.orm import joinedload
32from sqlalchemy.orm import mapper
33from sqlalchemy.orm import properties
34from sqlalchemy.orm import relationship
35from sqlalchemy.orm import Session
36from sqlalchemy.orm.events import MapperEvents
37from sqlalchemy.testing import assert_raises
38from sqlalchemy.testing import assert_raises_message
39from sqlalchemy.testing import assertions
40from sqlalchemy.testing import eq_
41from sqlalchemy.testing import expect_warnings
42from sqlalchemy.testing import fixtures
43from sqlalchemy.testing import is_
44from sqlalchemy.testing import mock
45from sqlalchemy.testing.schema import Column
46from sqlalchemy.testing.schema import Table
47from sqlalchemy.util import with_metaclass
48
49
50Base = None
51
52User = Address = None
53
54
55class DeclarativeTestBase(
56    fixtures.TestBase,
57    testing.AssertsExecutionResults,
58    testing.AssertsCompiledSQL,
59):
60    __dialect__ = "default"
61
62    def setup(self):
63        global Base
64        Base = decl.declarative_base(testing.db)
65
66    def teardown(self):
67        close_all_sessions()
68        clear_mappers()
69        Base.metadata.drop_all()
70
71
72class DeclarativeTest(DeclarativeTestBase):
73    def test_basic(self):
74        class User(Base, fixtures.ComparableEntity):
75            __tablename__ = "users"
76
77            id = Column(
78                "id", Integer, primary_key=True, test_needs_autoincrement=True
79            )
80            name = Column("name", String(50))
81            addresses = relationship("Address", backref="user")
82
83        class Address(Base, fixtures.ComparableEntity):
84            __tablename__ = "addresses"
85
86            id = Column(
87                Integer, primary_key=True, test_needs_autoincrement=True
88            )
89            email = Column(String(50), key="_email")
90            user_id = Column(
91                "user_id", Integer, ForeignKey("users.id"), key="_user_id"
92            )
93
94        Base.metadata.create_all()
95
96        eq_(Address.__table__.c["id"].name, "id")
97        eq_(Address.__table__.c["_email"].name, "email")
98        eq_(Address.__table__.c["_user_id"].name, "user_id")
99
100        u1 = User(
101            name="u1", addresses=[Address(email="one"), Address(email="two")]
102        )
103        sess = create_session()
104        sess.add(u1)
105        sess.flush()
106        sess.expunge_all()
107
108        eq_(
109            sess.query(User).all(),
110            [
111                User(
112                    name="u1",
113                    addresses=[Address(email="one"), Address(email="two")],
114                )
115            ],
116        )
117
118        a1 = sess.query(Address).filter(Address.email == "two").one()
119        eq_(a1, Address(email="two"))
120        eq_(a1.user, User(name="u1"))
121
122    def test_deferred_reflection_default_error(self):
123        class MyExt(object):
124            @classmethod
125            def prepare(cls):
126                "sample prepare method"
127                to_map = _DeferredMapperConfig.classes_for_base(cls)
128                for thingy in to_map:
129                    thingy.map()
130
131            @classmethod
132            def _sa_decl_prepare(cls):
133                pass
134
135        class User(MyExt, Base):
136            __tablename__ = "user"
137            id = Column(Integer, primary_key=True)
138
139        assert_raises_message(
140            orm_exc.UnmappedClassError,
141            "Class test.ext.declarative.test_basic.User has a deferred "
142            "mapping on it.  It is not yet usable as a mapped class.",
143            Session().query,
144            User,
145        )
146
147        User.prepare()
148
149        self.assert_compile(
150            Session().query(User), 'SELECT "user".id AS user_id FROM "user"'
151        )
152
153    def test_unicode_string_resolve(self):
154        class User(Base, fixtures.ComparableEntity):
155            __tablename__ = "users"
156
157            id = Column(
158                "id", Integer, primary_key=True, test_needs_autoincrement=True
159            )
160            name = Column("name", String(50))
161            addresses = relationship(util.u("Address"), backref="user")
162
163        class Address(Base, fixtures.ComparableEntity):
164            __tablename__ = "addresses"
165
166            id = Column(
167                Integer, primary_key=True, test_needs_autoincrement=True
168            )
169            email = Column(String(50), key="_email")
170            user_id = Column(
171                "user_id", Integer, ForeignKey("users.id"), key="_user_id"
172            )
173
174        assert User.addresses.property.mapper.class_ is Address
175
176    def test_unicode_string_resolve_backref(self):
177        class User(Base, fixtures.ComparableEntity):
178            __tablename__ = "users"
179
180            id = Column(
181                "id", Integer, primary_key=True, test_needs_autoincrement=True
182            )
183            name = Column("name", String(50))
184
185        class Address(Base, fixtures.ComparableEntity):
186            __tablename__ = "addresses"
187
188            id = Column(
189                Integer, primary_key=True, test_needs_autoincrement=True
190            )
191            email = Column(String(50), key="_email")
192            user_id = Column(
193                "user_id", Integer, ForeignKey("users.id"), key="_user_id"
194            )
195            user = relationship(
196                User,
197                backref=backref("addresses", order_by=util.u("Address.email")),
198            )
199
200        assert Address.user.property.mapper.class_ is User
201
202    def test_no_table(self):
203        def go():
204            class User(Base):
205                id = Column("id", Integer, primary_key=True)
206
207        assert_raises_message(
208            sa.exc.InvalidRequestError, "does not have a __table__", go
209        )
210
211    def test_table_args_empty_dict(self):
212        class MyModel(Base):
213            __tablename__ = "test"
214            id = Column(Integer, primary_key=True)
215            __table_args__ = {}
216
217    def test_table_args_empty_tuple(self):
218        class MyModel(Base):
219            __tablename__ = "test"
220            id = Column(Integer, primary_key=True)
221            __table_args__ = ()
222
223    def test_cant_add_columns(self):
224        t = Table(
225            "t",
226            Base.metadata,
227            Column("id", Integer, primary_key=True),
228            Column("data", String),
229        )
230
231        def go():
232            class User(Base):
233                __table__ = t
234                foo = Column(Integer, primary_key=True)
235
236        # can't specify new columns not already in the table
237
238        assert_raises_message(
239            sa.exc.ArgumentError,
240            "Can't add additional column 'foo' when " "specifying __table__",
241            go,
242        )
243
244        # regular re-mapping works tho
245
246        class Bar(Base):
247            __table__ = t
248            some_data = t.c.data
249
250        assert (
251            class_mapper(Bar).get_property("some_data").columns[0] is t.c.data
252        )
253
254    def test_lower_case_c_column_warning(self):
255        with assertions.expect_warnings(
256            r"Attribute 'x' on class <class .*Foo.* appears to be a "
257            r"non-schema 'sqlalchemy.sql.column\(\)' object; "
258        ):
259
260            class Foo(Base):
261                __tablename__ = "foo"
262
263                id = Column(Integer, primary_key=True)
264                x = sa.sql.expression.column(Integer)
265                y = Column(Integer)
266
267        class MyMixin(object):
268            x = sa.sql.expression.column(Integer)
269            y = Column(Integer)
270
271        with assertions.expect_warnings(
272            r"Attribute 'x' on class <class .*MyMixin.* appears to be a "
273            r"non-schema 'sqlalchemy.sql.column\(\)' object; "
274        ):
275
276            class Foo2(MyMixin, Base):
277                __tablename__ = "foo2"
278
279                id = Column(Integer, primary_key=True)
280
281        with assertions.expect_warnings(
282            r"Attribute 'x' on class <class .*Foo3.* appears to be a "
283            r"non-schema 'sqlalchemy.sql.column\(\)' object; "
284        ):
285
286            class Foo3(Base):
287                __tablename__ = "foo3"
288
289                id = Column(Integer, primary_key=True)
290
291                @declared_attr
292                def x(cls):
293                    return sa.sql.expression.column(Integer)
294
295                y = Column(Integer)
296
297        with assertions.expect_warnings(
298            r"Attribute 'x' on class <class .*Foo4.* appears to be a "
299            r"non-schema 'sqlalchemy.sql.column\(\)' object; "
300        ):
301
302            class MyMixin2(object):
303                @declared_attr
304                def x(cls):
305                    return sa.sql.expression.column(Integer)
306
307                y = Column(Integer)
308
309            class Foo4(MyMixin2, Base):
310                __tablename__ = "foo4"
311
312                id = Column(Integer, primary_key=True)
313
314    def test_column_named_twice(self):
315        def go():
316            class Foo(Base):
317                __tablename__ = "foo"
318
319                id = Column(Integer, primary_key=True)
320                x = Column("x", Integer)
321                y = Column("x", Integer)
322
323        assert_raises_message(
324            sa.exc.SAWarning,
325            "On class 'Foo', Column object 'x' named directly multiple times, "
326            "only one will be used: x, y",
327            go,
328        )
329
330    def test_column_repeated_under_prop(self):
331        def go():
332            class Foo(Base):
333                __tablename__ = "foo"
334
335                id = Column(Integer, primary_key=True)
336                x = Column("x", Integer)
337                y = column_property(x)
338                z = Column("x", Integer)
339
340        assert_raises_message(
341            sa.exc.SAWarning,
342            "On class 'Foo', Column object 'x' named directly multiple times, "
343            "only one will be used: x, y, z",
344            go,
345        )
346
347    def test_using_explicit_prop_in_schema_objects(self):
348        class Foo(Base):
349            __tablename__ = "foo"
350
351            id = Column(Integer, primary_key=True)
352            cprop = column_property(Column(Integer))
353
354            __table_args__ = (UniqueConstraint(cprop),)
355
356        uq = [
357            c
358            for c in Foo.__table__.constraints
359            if isinstance(c, UniqueConstraint)
360        ][0]
361        is_(uq.columns.cprop, Foo.__table__.c.cprop)
362
363        class Bar(Base):
364            __tablename__ = "bar"
365
366            id = Column(Integer, primary_key=True)
367            cprop = deferred(Column(Integer))
368
369            __table_args__ = (CheckConstraint(cprop > sa.func.foo()),)
370
371        ck = [
372            c
373            for c in Bar.__table__.constraints
374            if isinstance(c, CheckConstraint)
375        ][0]
376        is_(ck.columns.cprop, Bar.__table__.c.cprop)
377
378        if testing.requires.python3.enabled:
379            # test the existing failure case in case something changes
380            def go():
381                class Bat(Base):
382                    __tablename__ = "bat"
383
384                    id = Column(Integer, primary_key=True)
385                    cprop = deferred(Column(Integer))
386
387                    # we still can't do an expression like
388                    # "cprop > 5" because the column property isn't
389                    # a full blown column
390
391                    __table_args__ = (CheckConstraint(cprop > 5),)
392
393            assert_raises(TypeError, go)
394
395    def test_relationship_level_msg_for_invalid_callable(self):
396        class A(Base):
397            __tablename__ = "a"
398            id = Column(Integer, primary_key=True)
399
400        class B(Base):
401            __tablename__ = "b"
402            id = Column(Integer, primary_key=True)
403            a_id = Column(Integer, ForeignKey("a.id"))
404            a = relationship("a")
405
406        assert_raises_message(
407            sa.exc.ArgumentError,
408            "relationship 'a' expects a class or a mapper "
409            "argument .received: .*Table",
410            configure_mappers,
411        )
412
413    def test_relationship_level_msg_for_invalid_object(self):
414        class A(Base):
415            __tablename__ = "a"
416            id = Column(Integer, primary_key=True)
417
418        class B(Base):
419            __tablename__ = "b"
420            id = Column(Integer, primary_key=True)
421            a_id = Column(Integer, ForeignKey("a.id"))
422            a = relationship(A.__table__)
423
424        assert_raises_message(
425            sa.exc.ArgumentError,
426            "relationship 'a' expects a class or a mapper "
427            "argument .received: .*Table",
428            configure_mappers,
429        )
430
431    def test_difficult_class(self):
432        """test no getattr() errors with a customized class"""
433
434        # metaclass to mock the way zope.interface breaks getattr()
435        class BrokenMeta(type):
436            def __getattribute__(self, attr):
437                if attr == "xyzzy":
438                    raise AttributeError("xyzzy")
439                else:
440                    return object.__getattribute__(self, attr)
441
442        # even though this class has an xyzzy attribute, getattr(cls,"xyzzy")
443        # fails
444        class BrokenParent(with_metaclass(BrokenMeta)):
445            xyzzy = "magic"
446
447        # _as_declarative() inspects obj.__class__.__bases__
448        class User(BrokenParent, fixtures.ComparableEntity):
449            __tablename__ = "users"
450            id = Column(
451                "id", Integer, primary_key=True, test_needs_autoincrement=True
452            )
453            name = Column("name", String(50))
454
455        decl.instrument_declarative(User, {}, Base.metadata)
456
457    def test_reserved_identifiers(self):
458        def go1():
459            class User1(Base):
460                __tablename__ = "user1"
461                id = Column(Integer, primary_key=True)
462                metadata = Column(Integer)
463
464        def go2():
465            class User2(Base):
466                __tablename__ = "user2"
467                id = Column(Integer, primary_key=True)
468                metadata = relationship("Address")
469
470        for go in (go1, go2):
471            assert_raises_message(
472                exc.InvalidRequestError,
473                "Attribute name 'metadata' is reserved "
474                "for the MetaData instance when using a "
475                "declarative base class.",
476                go,
477            )
478
479    def test_undefer_column_name(self):
480        # TODO: not sure if there was an explicit
481        # test for this elsewhere
482        foo = Column(Integer)
483        eq_(str(foo), "(no name)")
484        eq_(foo.key, None)
485        eq_(foo.name, None)
486        decl.base._undefer_column_name("foo", foo)
487        eq_(str(foo), "foo")
488        eq_(foo.key, "foo")
489        eq_(foo.name, "foo")
490
491    def test_recompile_on_othermapper(self):
492        """declarative version of the same test in mappers.py"""
493
494        from sqlalchemy.orm import mapperlib
495
496        class User(Base):
497            __tablename__ = "users"
498
499            id = Column("id", Integer, primary_key=True)
500            name = Column("name", String(50))
501
502        class Address(Base):
503            __tablename__ = "addresses"
504
505            id = Column("id", Integer, primary_key=True)
506            email = Column("email", String(50))
507            user_id = Column("user_id", Integer, ForeignKey("users.id"))
508            user = relationship(
509                "User", primaryjoin=user_id == User.id, backref="addresses"
510            )
511
512        assert mapperlib.Mapper._new_mappers is True
513        u = User()  # noqa
514        assert User.addresses
515        assert mapperlib.Mapper._new_mappers is False
516
517    def test_string_dependency_resolution(self):
518        class User(Base, fixtures.ComparableEntity):
519
520            __tablename__ = "users"
521            id = Column(
522                Integer, primary_key=True, test_needs_autoincrement=True
523            )
524            name = Column(String(50))
525            addresses = relationship(
526                "Address",
527                order_by="desc(Address.email)",
528                primaryjoin="User.id==Address.user_id",
529                foreign_keys="[Address.user_id]",
530                backref=backref(
531                    "user",
532                    primaryjoin="User.id==Address.user_id",
533                    foreign_keys="[Address.user_id]",
534                ),
535            )
536
537        class Address(Base, fixtures.ComparableEntity):
538
539            __tablename__ = "addresses"
540            id = Column(
541                Integer, primary_key=True, test_needs_autoincrement=True
542            )
543            email = Column(String(50))
544            user_id = Column(Integer)  # note no foreign key
545
546        Base.metadata.create_all()
547        sess = create_session()
548        u1 = User(
549            name="ed",
550            addresses=[
551                Address(email="abc"),
552                Address(email="def"),
553                Address(email="xyz"),
554            ],
555        )
556        sess.add(u1)
557        sess.flush()
558        sess.expunge_all()
559        eq_(
560            sess.query(User).filter(User.name == "ed").one(),
561            User(
562                name="ed",
563                addresses=[
564                    Address(email="xyz"),
565                    Address(email="def"),
566                    Address(email="abc"),
567                ],
568            ),
569        )
570
571        class Foo(Base, fixtures.ComparableEntity):
572
573            __tablename__ = "foo"
574            id = Column(Integer, primary_key=True)
575            rel = relationship("User", primaryjoin="User.addresses==Foo.id")
576
577        assert_raises_message(
578            exc.InvalidRequestError,
579            "'addresses' is not an instance of " "ColumnProperty",
580            configure_mappers,
581        )
582
583    def test_string_dependency_resolution_synonym(self):
584        class User(Base, fixtures.ComparableEntity):
585
586            __tablename__ = "users"
587            id = Column(
588                Integer, primary_key=True, test_needs_autoincrement=True
589            )
590            name = Column(String(50))
591
592        Base.metadata.create_all()
593        sess = create_session()
594        u1 = User(name="ed")
595        sess.add(u1)
596        sess.flush()
597        sess.expunge_all()
598        eq_(sess.query(User).filter(User.name == "ed").one(), User(name="ed"))
599
600        class Foo(Base, fixtures.ComparableEntity):
601
602            __tablename__ = "foo"
603            id = Column(Integer, primary_key=True)
604            _user_id = Column(Integer)
605            rel = relationship(
606                "User",
607                uselist=False,
608                foreign_keys=[User.id],
609                primaryjoin="Foo.user_id==User.id",
610            )
611
612            @synonym_for("_user_id")
613            @property
614            def user_id(self):
615                return self._user_id
616
617        foo = Foo()
618        foo.rel = u1
619        assert foo.rel == u1
620
621    def test_string_dependency_resolution_orm_descriptor(self):
622        from sqlalchemy.ext.hybrid import hybrid_property
623
624        class User(Base):
625            __tablename__ = "user"
626            id = Column(Integer, primary_key=True)
627            firstname = Column(String(50))
628            lastname = Column(String(50))
629            game_id = Column(Integer, ForeignKey("game.id"))
630
631            @hybrid_property
632            def fullname(self):
633                return self.firstname + " " + self.lastname
634
635        class Game(Base):
636            __tablename__ = "game"
637            id = Column(Integer, primary_key=True)
638            name = Column(String(50))
639            users = relationship("User", order_by="User.fullname")
640
641        s = Session()
642        self.assert_compile(
643            s.query(Game).options(joinedload(Game.users)),
644            "SELECT game.id AS game_id, game.name AS game_name, "
645            "user_1.id AS user_1_id, user_1.firstname AS user_1_firstname, "
646            "user_1.lastname AS user_1_lastname, "
647            "user_1.game_id AS user_1_game_id "
648            'FROM game LEFT OUTER JOIN "user" AS user_1 ON game.id = '
649            "user_1.game_id ORDER BY "
650            "user_1.firstname || :firstname_1 || user_1.lastname",
651        )
652
653    def test_string_dependency_resolution_asselectable(self):
654        class A(Base):
655            __tablename__ = "a"
656
657            id = Column(Integer, primary_key=True)
658            b_id = Column(ForeignKey("b.id"))
659
660            d = relationship(
661                "D",
662                secondary="join(B, D, B.d_id == D.id)."
663                "join(C, C.d_id == D.id)",
664                primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
665                secondaryjoin="D.id == B.d_id",
666            )
667
668        class B(Base):
669            __tablename__ = "b"
670
671            id = Column(Integer, primary_key=True)
672            d_id = Column(ForeignKey("d.id"))
673
674        class C(Base):
675            __tablename__ = "c"
676
677            id = Column(Integer, primary_key=True)
678            a_id = Column(ForeignKey("a.id"))
679            d_id = Column(ForeignKey("d.id"))
680
681        class D(Base):
682            __tablename__ = "d"
683
684            id = Column(Integer, primary_key=True)
685
686        s = Session()
687        self.assert_compile(
688            s.query(A).join(A.d),
689            "SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN "
690            "(b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id "
691            "JOIN c AS c_1 ON c_1.d_id = d_1.id) ON a.b_id = b_1.id "
692            "AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id",
693        )
694
695    def test_string_dependency_resolution_no_table(self):
696        class User(Base, fixtures.ComparableEntity):
697            __tablename__ = "users"
698            id = Column(
699                Integer, primary_key=True, test_needs_autoincrement=True
700            )
701            name = Column(String(50))
702
703        class Bar(Base, fixtures.ComparableEntity):
704            __tablename__ = "bar"
705            id = Column(Integer, primary_key=True)
706            rel = relationship("User", primaryjoin="User.id==Bar.__table__.id")
707
708        assert_raises_message(
709            exc.InvalidRequestError,
710            "does not have a mapped column named " "'__table__'",
711            configure_mappers,
712        )
713
714    def test_string_w_pj_annotations(self):
715        class User(Base, fixtures.ComparableEntity):
716            __tablename__ = "users"
717            id = Column(
718                Integer, primary_key=True, test_needs_autoincrement=True
719            )
720            name = Column(String(50))
721
722        class Address(Base, fixtures.ComparableEntity):
723
724            __tablename__ = "addresses"
725            id = Column(
726                Integer, primary_key=True, test_needs_autoincrement=True
727            )
728            email = Column(String(50))
729            user_id = Column(Integer)
730            user = relationship(
731                "User", primaryjoin="remote(User.id)==foreign(Address.user_id)"
732            )
733
734        eq_(
735            Address.user.property._join_condition.local_remote_pairs,
736            [(Address.__table__.c.user_id, User.__table__.c.id)],
737        )
738
739    def test_string_dependency_resolution_no_magic(self):
740        """test that full tinkery expressions work as written"""
741
742        class User(Base, fixtures.ComparableEntity):
743
744            __tablename__ = "users"
745            id = Column(Integer, primary_key=True)
746            addresses = relationship(
747                "Address",
748                primaryjoin="User.id==Address.user_id.prop.columns[0]",
749            )
750
751        class Address(Base, fixtures.ComparableEntity):
752
753            __tablename__ = "addresses"
754            id = Column(Integer, primary_key=True)
755            user_id = Column(Integer, ForeignKey("users.id"))
756
757        configure_mappers()
758        eq_(
759            str(User.addresses.prop.primaryjoin),
760            "users.id = addresses.user_id",
761        )
762
763    def test_string_dependency_resolution_module_qualified(self):
764        class User(Base, fixtures.ComparableEntity):
765
766            __tablename__ = "users"
767            id = Column(Integer, primary_key=True)
768            addresses = relationship(
769                "%s.Address" % __name__,
770                primaryjoin="%s.User.id==%s.Address.user_id.prop.columns[0]"
771                % (__name__, __name__),
772            )
773
774        class Address(Base, fixtures.ComparableEntity):
775
776            __tablename__ = "addresses"
777            id = Column(Integer, primary_key=True)
778            user_id = Column(Integer, ForeignKey("users.id"))
779
780        configure_mappers()
781        eq_(
782            str(User.addresses.prop.primaryjoin),
783            "users.id = addresses.user_id",
784        )
785
786    def test_string_dependency_resolution_in_backref(self):
787        class User(Base, fixtures.ComparableEntity):
788
789            __tablename__ = "users"
790            id = Column(Integer, primary_key=True)
791            name = Column(String(50))
792            addresses = relationship(
793                "Address",
794                primaryjoin="User.id==Address.user_id",
795                backref="user",
796            )
797
798        class Address(Base, fixtures.ComparableEntity):
799
800            __tablename__ = "addresses"
801            id = Column(Integer, primary_key=True)
802            email = Column(String(50))
803            user_id = Column(Integer, ForeignKey("users.id"))
804
805        configure_mappers()
806        eq_(
807            str(User.addresses.property.primaryjoin),
808            str(Address.user.property.primaryjoin),
809        )
810
811    def test_string_dependency_resolution_tables(self):
812        class User(Base, fixtures.ComparableEntity):
813
814            __tablename__ = "users"
815            id = Column(Integer, primary_key=True)
816            name = Column(String(50))
817            props = relationship(
818                "Prop",
819                secondary="user_to_prop",
820                primaryjoin="User.id==user_to_prop.c.u" "ser_id",
821                secondaryjoin="user_to_prop.c.prop_id=" "=Prop.id",
822                backref="users",
823            )
824
825        class Prop(Base, fixtures.ComparableEntity):
826
827            __tablename__ = "props"
828            id = Column(Integer, primary_key=True)
829            name = Column(String(50))
830
831        user_to_prop = Table(
832            "user_to_prop",
833            Base.metadata,
834            Column("user_id", Integer, ForeignKey("users.id")),
835            Column("prop_id", Integer, ForeignKey("props.id")),
836        )
837
838        configure_mappers()
839        assert (
840            class_mapper(User).get_property("props").secondary is user_to_prop
841        )
842
843    def test_string_dependency_resolution_table_over_class(self):
844        # test for second half of #5774
845        class User(Base, fixtures.ComparableEntity):
846
847            __tablename__ = "users"
848            id = Column(Integer, primary_key=True)
849            name = Column(String(50))
850            props = relationship(
851                "Prop",
852                secondary="Secondary",
853                backref="users",
854            )
855
856        class Prop(Base, fixtures.ComparableEntity):
857
858            __tablename__ = "props"
859            id = Column(Integer, primary_key=True)
860            name = Column(String(50))
861
862        # class name and table name match
863        class Secondary(Base):
864            __tablename__ = "Secondary"
865            user_id = Column(Integer, ForeignKey("users.id"), primary_key=True)
866            prop_id = Column(Integer, ForeignKey("props.id"), primary_key=True)
867
868        configure_mappers()
869        assert (
870            class_mapper(User).get_property("props").secondary
871            is Secondary.__table__
872        )
873
874    def test_string_dependency_resolution_class_over_table(self):
875        # test for second half of #5774
876        class User(Base, fixtures.ComparableEntity):
877
878            __tablename__ = "users"
879            id = Column(Integer, primary_key=True)
880            name = Column(String(50))
881            secondary = relationship(
882                "Secondary",
883            )
884
885        # class name and table name match
886        class Secondary(Base):
887            __tablename__ = "Secondary"
888            user_id = Column(Integer, ForeignKey("users.id"), primary_key=True)
889
890        configure_mappers()
891        assert (
892            class_mapper(User).get_property("secondary").mapper
893            is Secondary.__mapper__
894        )
895
896    def test_string_dependency_resolution_schemas(self):
897        Base = decl.declarative_base()
898
899        class User(Base):
900
901            __tablename__ = "users"
902            __table_args__ = {"schema": "fooschema"}
903
904            id = Column(Integer, primary_key=True)
905            name = Column(String(50))
906            props = relationship(
907                "Prop",
908                secondary="fooschema.user_to_prop",
909                primaryjoin="User.id==fooschema.user_to_prop.c.user_id",
910                secondaryjoin="fooschema.user_to_prop.c.prop_id==Prop.id",
911                backref="users",
912            )
913
914        class Prop(Base):
915
916            __tablename__ = "props"
917            __table_args__ = {"schema": "fooschema"}
918
919            id = Column(Integer, primary_key=True)
920            name = Column(String(50))
921
922        user_to_prop = Table(
923            "user_to_prop",
924            Base.metadata,
925            Column("user_id", Integer, ForeignKey("fooschema.users.id")),
926            Column("prop_id", Integer, ForeignKey("fooschema.props.id")),
927            schema="fooschema",
928        )
929        configure_mappers()
930
931        assert (
932            class_mapper(User).get_property("props").secondary is user_to_prop
933        )
934
935    def test_string_dependency_resolution_annotations(self):
936        Base = decl.declarative_base()
937
938        class Parent(Base):
939            __tablename__ = "parent"
940            id = Column(Integer, primary_key=True)
941            name = Column(String)
942            children = relationship(
943                "Child",
944                primaryjoin="Parent.name=="
945                "remote(foreign(func.lower(Child.name_upper)))",
946            )
947
948        class Child(Base):
949            __tablename__ = "child"
950            id = Column(Integer, primary_key=True)
951            name_upper = Column(String)
952
953        configure_mappers()
954        eq_(
955            Parent.children.property._calculated_foreign_keys,
956            set([Child.name_upper.property.columns[0]]),
957        )
958
959    def test_shared_class_registry(self):
960        reg = {}
961        Base1 = decl.declarative_base(testing.db, class_registry=reg)
962        Base2 = decl.declarative_base(testing.db, class_registry=reg)
963
964        class A(Base1):
965            __tablename__ = "a"
966            id = Column(Integer, primary_key=True)
967
968        class B(Base2):
969            __tablename__ = "b"
970            id = Column(Integer, primary_key=True)
971            aid = Column(Integer, ForeignKey(A.id))
972            as_ = relationship("A")
973
974        assert B.as_.property.mapper.class_ is A
975
976    def test_uncompiled_attributes_in_relationship(self):
977        class Address(Base, fixtures.ComparableEntity):
978
979            __tablename__ = "addresses"
980            id = Column(
981                Integer, primary_key=True, test_needs_autoincrement=True
982            )
983            email = Column(String(50))
984            user_id = Column(Integer, ForeignKey("users.id"))
985
986        class User(Base, fixtures.ComparableEntity):
987
988            __tablename__ = "users"
989            id = Column(
990                Integer, primary_key=True, test_needs_autoincrement=True
991            )
992            name = Column(String(50))
993            addresses = relationship(
994                "Address",
995                order_by=Address.email,
996                foreign_keys=Address.user_id,
997                remote_side=Address.user_id,
998            )
999
1000        # get the mapper for User.   User mapper will compile,
1001        # "addresses" relationship will call upon Address.user_id for
1002        # its clause element.  Address.user_id is a _CompileOnAttr,
1003        # which then calls class_mapper(Address).  But !  We're already
1004        # "in compilation", but class_mapper(Address) needs to
1005        # initialize regardless, or COA's assertion fails and things
1006        # generally go downhill from there.
1007
1008        class_mapper(User)
1009        Base.metadata.create_all()
1010        sess = create_session()
1011        u1 = User(
1012            name="ed",
1013            addresses=[
1014                Address(email="abc"),
1015                Address(email="xyz"),
1016                Address(email="def"),
1017            ],
1018        )
1019        sess.add(u1)
1020        sess.flush()
1021        sess.expunge_all()
1022        eq_(
1023            sess.query(User).filter(User.name == "ed").one(),
1024            User(
1025                name="ed",
1026                addresses=[
1027                    Address(email="abc"),
1028                    Address(email="def"),
1029                    Address(email="xyz"),
1030                ],
1031            ),
1032        )
1033
1034    def test_nice_dependency_error(self):
1035        class User(Base):
1036
1037            __tablename__ = "users"
1038            id = Column("id", Integer, primary_key=True)
1039            addresses = relationship("Address")
1040
1041        class Address(Base):
1042
1043            __tablename__ = "addresses"
1044            id = Column(Integer, primary_key=True)
1045            foo = sa.orm.column_property(User.id == 5)
1046
1047        # this used to raise an error when accessing User.id but that's
1048        # no longer the case since we got rid of _CompileOnAttr.
1049
1050        assert_raises(sa.exc.ArgumentError, configure_mappers)
1051
1052    def test_nice_dependency_error_works_with_hasattr(self):
1053        class User(Base):
1054
1055            __tablename__ = "users"
1056            id = Column("id", Integer, primary_key=True)
1057            addresses = relationship("Address")
1058
1059        # hasattr() on a compile-loaded attribute
1060        try:
1061            hasattr(User.addresses, "property")
1062        except exc.InvalidRequestError:
1063            assert sa.util.compat.py32
1064
1065        # the exception is preserved.  Remains the
1066        # same through repeated calls.
1067        for i in range(3):
1068            assert_raises_message(
1069                sa.exc.InvalidRequestError,
1070                "^One or more mappers failed to initialize"
1071                " - can't proceed with initialization of other mappers. "
1072                r"Triggering mapper: 'mapped class User->users'. "
1073                "Original exception was: When initializing.*",
1074                configure_mappers,
1075            )
1076
1077    def test_custom_base(self):
1078        class MyBase(object):
1079            def foobar(self):
1080                return "foobar"
1081
1082        Base = decl.declarative_base(cls=MyBase)
1083        assert hasattr(Base, "metadata")
1084        assert Base().foobar() == "foobar"
1085
1086    def test_uses_get_on_class_col_fk(self):
1087
1088        # test [ticket:1492]
1089
1090        class Master(Base):
1091
1092            __tablename__ = "master"
1093            id = Column(
1094                Integer, primary_key=True, test_needs_autoincrement=True
1095            )
1096
1097        class Detail(Base):
1098
1099            __tablename__ = "detail"
1100            id = Column(
1101                Integer, primary_key=True, test_needs_autoincrement=True
1102            )
1103            master_id = Column(None, ForeignKey(Master.id))
1104            master = relationship(Master)
1105
1106        Base.metadata.create_all()
1107        configure_mappers()
1108        assert class_mapper(Detail).get_property("master").strategy.use_get
1109        m1 = Master()
1110        d1 = Detail(master=m1)
1111        sess = create_session()
1112        sess.add(d1)
1113        sess.flush()
1114        sess.expunge_all()
1115        d1 = sess.query(Detail).first()
1116        m1 = sess.query(Master).first()
1117
1118        def go():
1119            assert d1.master
1120
1121        self.assert_sql_count(testing.db, go, 0)
1122
1123    def test_index_doesnt_compile(self):
1124        class User(Base):
1125            __tablename__ = "users"
1126            id = Column("id", Integer, primary_key=True)
1127            name = Column("name", String(50))
1128            error = relationship("Address")
1129
1130        i = Index("my_index", User.name)
1131
1132        # compile fails due to the nonexistent Addresses relationship
1133        assert_raises(sa.exc.InvalidRequestError, configure_mappers)
1134
1135        # index configured
1136        assert i in User.__table__.indexes
1137        assert User.__table__.c.id not in set(i.columns)
1138        assert User.__table__.c.name in set(i.columns)
1139
1140        # tables create fine
1141        Base.metadata.create_all()
1142
1143    def test_add_prop(self):
1144        class User(Base, fixtures.ComparableEntity):
1145
1146            __tablename__ = "users"
1147            id = Column(
1148                "id", Integer, primary_key=True, test_needs_autoincrement=True
1149            )
1150
1151        User.name = Column("name", String(50))
1152        User.addresses = relationship("Address", backref="user")
1153
1154        class Address(Base, fixtures.ComparableEntity):
1155
1156            __tablename__ = "addresses"
1157            id = Column(
1158                Integer, primary_key=True, test_needs_autoincrement=True
1159            )
1160
1161        Address.email = Column(String(50), key="_email")
1162        Address.user_id = Column(
1163            "user_id", Integer, ForeignKey("users.id"), key="_user_id"
1164        )
1165        Base.metadata.create_all()
1166        eq_(Address.__table__.c["id"].name, "id")
1167        eq_(Address.__table__.c["_email"].name, "email")
1168        eq_(Address.__table__.c["_user_id"].name, "user_id")
1169        u1 = User(
1170            name="u1", addresses=[Address(email="one"), Address(email="two")]
1171        )
1172        sess = create_session()
1173        sess.add(u1)
1174        sess.flush()
1175        sess.expunge_all()
1176        eq_(
1177            sess.query(User).all(),
1178            [
1179                User(
1180                    name="u1",
1181                    addresses=[Address(email="one"), Address(email="two")],
1182                )
1183            ],
1184        )
1185        a1 = sess.query(Address).filter(Address.email == "two").one()
1186        eq_(a1, Address(email="two"))
1187        eq_(a1.user, User(name="u1"))
1188
1189    def test_alt_name_attr_subclass_column_inline(self):
1190        # [ticket:2900]
1191        class A(Base):
1192            __tablename__ = "a"
1193            id = Column("id", Integer, primary_key=True)
1194            data = Column("data")
1195
1196        class ASub(A):
1197            brap = A.data
1198
1199        assert ASub.brap.property is A.data.property
1200        assert isinstance(
1201            ASub.brap.original_property, properties.SynonymProperty
1202        )
1203
1204    def test_alt_name_attr_subclass_relationship_inline(self):
1205        # [ticket:2900]
1206        class A(Base):
1207            __tablename__ = "a"
1208            id = Column("id", Integer, primary_key=True)
1209            b_id = Column(Integer, ForeignKey("b.id"))
1210            b = relationship("B", backref="as_")
1211
1212        class B(Base):
1213            __tablename__ = "b"
1214            id = Column("id", Integer, primary_key=True)
1215
1216        configure_mappers()
1217
1218        class ASub(A):
1219            brap = A.b
1220
1221        assert ASub.brap.property is A.b.property
1222        assert isinstance(
1223            ASub.brap.original_property, properties.SynonymProperty
1224        )
1225        ASub(brap=B())
1226
1227    def test_alt_name_attr_subclass_column_attrset(self):
1228        # [ticket:2900]
1229        class A(Base):
1230            __tablename__ = "a"
1231            id = Column("id", Integer, primary_key=True)
1232            data = Column("data")
1233
1234        A.brap = A.data
1235        assert A.brap.property is A.data.property
1236        assert isinstance(A.brap.original_property, properties.SynonymProperty)
1237
1238    def test_alt_name_attr_subclass_relationship_attrset(self):
1239        # [ticket:2900]
1240        class A(Base):
1241            __tablename__ = "a"
1242            id = Column("id", Integer, primary_key=True)
1243            b_id = Column(Integer, ForeignKey("b.id"))
1244            b = relationship("B", backref="as_")
1245
1246        A.brap = A.b
1247
1248        class B(Base):
1249            __tablename__ = "b"
1250            id = Column("id", Integer, primary_key=True)
1251
1252        assert A.brap.property is A.b.property
1253        assert isinstance(A.brap.original_property, properties.SynonymProperty)
1254        A(brap=B())
1255
1256    def test_eager_order_by(self):
1257        class Address(Base, fixtures.ComparableEntity):
1258
1259            __tablename__ = "addresses"
1260            id = Column(
1261                "id", Integer, primary_key=True, test_needs_autoincrement=True
1262            )
1263            email = Column("email", String(50))
1264            user_id = Column("user_id", Integer, ForeignKey("users.id"))
1265
1266        class User(Base, fixtures.ComparableEntity):
1267
1268            __tablename__ = "users"
1269            id = Column(
1270                "id", Integer, primary_key=True, test_needs_autoincrement=True
1271            )
1272            name = Column("name", String(50))
1273            addresses = relationship("Address", order_by=Address.email)
1274
1275        Base.metadata.create_all()
1276        u1 = User(
1277            name="u1", addresses=[Address(email="two"), Address(email="one")]
1278        )
1279        sess = create_session()
1280        sess.add(u1)
1281        sess.flush()
1282        sess.expunge_all()
1283        eq_(
1284            sess.query(User).options(joinedload(User.addresses)).all(),
1285            [
1286                User(
1287                    name="u1",
1288                    addresses=[Address(email="one"), Address(email="two")],
1289                )
1290            ],
1291        )
1292
1293    def test_order_by_multi(self):
1294        class Address(Base, fixtures.ComparableEntity):
1295
1296            __tablename__ = "addresses"
1297            id = Column(
1298                "id", Integer, primary_key=True, test_needs_autoincrement=True
1299            )
1300            email = Column("email", String(50))
1301            user_id = Column("user_id", Integer, ForeignKey("users.id"))
1302
1303        class User(Base, fixtures.ComparableEntity):
1304
1305            __tablename__ = "users"
1306            id = Column(
1307                "id", Integer, primary_key=True, test_needs_autoincrement=True
1308            )
1309            name = Column("name", String(50))
1310            addresses = relationship(
1311                "Address", order_by=(Address.email, Address.id)
1312            )
1313
1314        Base.metadata.create_all()
1315        u1 = User(
1316            name="u1", addresses=[Address(email="two"), Address(email="one")]
1317        )
1318        sess = create_session()
1319        sess.add(u1)
1320        sess.flush()
1321        sess.expunge_all()
1322        u = sess.query(User).filter(User.name == "u1").one()
1323        u.addresses
1324
1325    def test_as_declarative(self):
1326        class User(fixtures.ComparableEntity):
1327
1328            __tablename__ = "users"
1329            id = Column(
1330                "id", Integer, primary_key=True, test_needs_autoincrement=True
1331            )
1332            name = Column("name", String(50))
1333            addresses = relationship("Address", backref="user")
1334
1335        class Address(fixtures.ComparableEntity):
1336
1337            __tablename__ = "addresses"
1338            id = Column(
1339                "id", Integer, primary_key=True, test_needs_autoincrement=True
1340            )
1341            email = Column("email", String(50))
1342            user_id = Column("user_id", Integer, ForeignKey("users.id"))
1343
1344        reg = {}
1345        decl.instrument_declarative(User, reg, Base.metadata)
1346        decl.instrument_declarative(Address, reg, Base.metadata)
1347        Base.metadata.create_all()
1348        u1 = User(
1349            name="u1", addresses=[Address(email="one"), Address(email="two")]
1350        )
1351        sess = create_session()
1352        sess.add(u1)
1353        sess.flush()
1354        sess.expunge_all()
1355        eq_(
1356            sess.query(User).all(),
1357            [
1358                User(
1359                    name="u1",
1360                    addresses=[Address(email="one"), Address(email="two")],
1361                )
1362            ],
1363        )
1364
1365    def test_custom_mapper_attribute(self):
1366        def mymapper(cls, tbl, **kwargs):
1367            m = sa.orm.mapper(cls, tbl, **kwargs)
1368            m.CHECK = True
1369            return m
1370
1371        base = decl.declarative_base()
1372
1373        class Foo(base):
1374            __tablename__ = "foo"
1375            __mapper_cls__ = mymapper
1376            id = Column(Integer, primary_key=True)
1377
1378        eq_(Foo.__mapper__.CHECK, True)
1379
1380    def test_custom_mapper_argument(self):
1381        def mymapper(cls, tbl, **kwargs):
1382            m = sa.orm.mapper(cls, tbl, **kwargs)
1383            m.CHECK = True
1384            return m
1385
1386        base = decl.declarative_base(mapper=mymapper)
1387
1388        class Foo(base):
1389            __tablename__ = "foo"
1390            id = Column(Integer, primary_key=True)
1391
1392        eq_(Foo.__mapper__.CHECK, True)
1393
1394    def test_no_change_to_all_descriptors(self):
1395        base = decl.declarative_base()
1396
1397        class Foo(base):
1398            __tablename__ = "foo"
1399            id = Column(Integer, primary_key=True)
1400
1401        eq_(Foo.__mapper__.all_orm_descriptors.keys(), ["id"])
1402
1403    def test_oops(self):
1404
1405        with testing.expect_warnings(
1406            "Ignoring declarative-like tuple value of " "attribute 'name'"
1407        ):
1408
1409            class User(Base, fixtures.ComparableEntity):
1410
1411                __tablename__ = "users"
1412                id = Column("id", Integer, primary_key=True)
1413                name = (Column("name", String(50)),)
1414
1415    def test_table_args_no_dict(self):
1416        class Foo1(Base):
1417
1418            __tablename__ = "foo"
1419            __table_args__ = (ForeignKeyConstraint(["id"], ["foo.bar"]),)
1420            id = Column("id", Integer, primary_key=True)
1421            bar = Column("bar", Integer)
1422
1423        assert Foo1.__table__.c.id.references(Foo1.__table__.c.bar)
1424
1425    def test_table_args_type(self):
1426        def err():
1427            class Foo1(Base):
1428
1429                __tablename__ = "foo"
1430                __table_args__ = ForeignKeyConstraint(["id"], ["foo.id"])
1431                id = Column("id", Integer, primary_key=True)
1432
1433        assert_raises_message(
1434            sa.exc.ArgumentError, "__table_args__ value must be a tuple, ", err
1435        )
1436
1437    def test_table_args_none(self):
1438        class Foo2(Base):
1439
1440            __tablename__ = "foo"
1441            __table_args__ = None
1442            id = Column("id", Integer, primary_key=True)
1443
1444        assert Foo2.__table__.kwargs == {}
1445
1446    def test_table_args_dict_format(self):
1447        class Foo2(Base):
1448
1449            __tablename__ = "foo"
1450            __table_args__ = {"mysql_engine": "InnoDB"}
1451            id = Column("id", Integer, primary_key=True)
1452
1453        assert Foo2.__table__.kwargs["mysql_engine"] == "InnoDB"
1454
1455    def test_table_args_tuple_format(self):
1456        class Foo2(Base):
1457
1458            __tablename__ = "foo"
1459            __table_args__ = {"mysql_engine": "InnoDB"}
1460            id = Column("id", Integer, primary_key=True)
1461
1462        class Bar(Base):
1463
1464            __tablename__ = "bar"
1465            __table_args__ = (
1466                ForeignKeyConstraint(["id"], ["foo.id"]),
1467                {"mysql_engine": "InnoDB"},
1468            )
1469            id = Column("id", Integer, primary_key=True)
1470
1471        assert Bar.__table__.c.id.references(Foo2.__table__.c.id)
1472        assert Bar.__table__.kwargs["mysql_engine"] == "InnoDB"
1473
1474    def test_table_cls_attribute(self):
1475        class Foo(Base):
1476            __tablename__ = "foo"
1477
1478            @classmethod
1479            def __table_cls__(cls, *arg, **kw):
1480                name = arg[0]
1481                return Table(name + "bat", *arg[1:], **kw)
1482
1483            id = Column(Integer, primary_key=True)
1484
1485        eq_(Foo.__table__.name, "foobat")
1486
1487    def test_table_cls_attribute_return_none(self):
1488        from sqlalchemy.schema import Column, PrimaryKeyConstraint
1489
1490        class AutoTable(object):
1491            @declared_attr
1492            def __tablename__(cls):
1493                return cls.__name__
1494
1495            @classmethod
1496            def __table_cls__(cls, *arg, **kw):
1497                for obj in arg[1:]:
1498                    if (
1499                        isinstance(obj, Column) and obj.primary_key
1500                    ) or isinstance(obj, PrimaryKeyConstraint):
1501                        return Table(*arg, **kw)
1502
1503                return None
1504
1505        class Person(AutoTable, Base):
1506            id = Column(Integer, primary_key=True)
1507
1508        class Employee(Person):
1509            employee_name = Column(String)
1510
1511        is_(inspect(Employee).local_table, Person.__table__)
1512
1513    def test_expression(self):
1514        class User(Base, fixtures.ComparableEntity):
1515
1516            __tablename__ = "users"
1517            id = Column(
1518                "id", Integer, primary_key=True, test_needs_autoincrement=True
1519            )
1520            name = Column("name", String(50))
1521            addresses = relationship("Address", backref="user")
1522
1523        class Address(Base, fixtures.ComparableEntity):
1524
1525            __tablename__ = "addresses"
1526            id = Column(
1527                "id", Integer, primary_key=True, test_needs_autoincrement=True
1528            )
1529            email = Column("email", String(50))
1530            user_id = Column("user_id", Integer, ForeignKey("users.id"))
1531
1532        User.address_count = sa.orm.column_property(
1533            sa.select([sa.func.count(Address.id)])
1534            .where(Address.user_id == User.id)
1535            .as_scalar()
1536        )
1537        Base.metadata.create_all()
1538        u1 = User(
1539            name="u1", addresses=[Address(email="one"), Address(email="two")]
1540        )
1541        sess = create_session()
1542        sess.add(u1)
1543        sess.flush()
1544        sess.expunge_all()
1545        eq_(
1546            sess.query(User).all(),
1547            [
1548                User(
1549                    name="u1",
1550                    address_count=2,
1551                    addresses=[Address(email="one"), Address(email="two")],
1552                )
1553            ],
1554        )
1555
1556    def test_useless_declared_attr(self):
1557        class Address(Base, fixtures.ComparableEntity):
1558
1559            __tablename__ = "addresses"
1560            id = Column(
1561                "id", Integer, primary_key=True, test_needs_autoincrement=True
1562            )
1563            email = Column("email", String(50))
1564            user_id = Column("user_id", Integer, ForeignKey("users.id"))
1565
1566        class User(Base, fixtures.ComparableEntity):
1567
1568            __tablename__ = "users"
1569            id = Column(
1570                "id", Integer, primary_key=True, test_needs_autoincrement=True
1571            )
1572            name = Column("name", String(50))
1573            addresses = relationship("Address", backref="user")
1574
1575            @declared_attr
1576            def address_count(cls):
1577                # this doesn't really gain us anything.  but if
1578                # one is used, lets have it function as expected...
1579                return sa.orm.column_property(
1580                    sa.select([sa.func.count(Address.id)]).where(
1581                        Address.user_id == cls.id
1582                    )
1583                )
1584
1585        Base.metadata.create_all()
1586        u1 = User(
1587            name="u1", addresses=[Address(email="one"), Address(email="two")]
1588        )
1589        sess = create_session()
1590        sess.add(u1)
1591        sess.flush()
1592        sess.expunge_all()
1593        eq_(
1594            sess.query(User).all(),
1595            [
1596                User(
1597                    name="u1",
1598                    address_count=2,
1599                    addresses=[Address(email="one"), Address(email="two")],
1600                )
1601            ],
1602        )
1603
1604    def test_declared_on_base_class(self):
1605        class MyBase(Base):
1606            __tablename__ = "foo"
1607            id = Column(Integer, primary_key=True)
1608
1609            @declared_attr
1610            def somecol(cls):
1611                return Column(Integer)
1612
1613        class MyClass(MyBase):
1614            __tablename__ = "bar"
1615            id = Column(Integer, ForeignKey("foo.id"), primary_key=True)
1616
1617        # previously, the 'somecol' declared_attr would be ignored
1618        # by the mapping and would remain unused.  now we take
1619        # it as part of MyBase.
1620
1621        assert "somecol" in MyBase.__table__.c
1622        assert "somecol" not in MyClass.__table__.c
1623
1624    def test_decl_cascading_warns_non_mixin(self):
1625        with expect_warnings(
1626            "Use of @declared_attr.cascading only applies to "
1627            "Declarative 'mixin' and 'abstract' classes.  "
1628            "Currently, this flag is ignored on mapped class "
1629            "<class '.*.MyBase'>"
1630        ):
1631
1632            class MyBase(Base):
1633                __tablename__ = "foo"
1634                id = Column(Integer, primary_key=True)
1635
1636                @declared_attr.cascading
1637                def somecol(cls):
1638                    return Column(Integer)
1639
1640    def test_column(self):
1641        class User(Base, fixtures.ComparableEntity):
1642
1643            __tablename__ = "users"
1644            id = Column(
1645                "id", Integer, primary_key=True, test_needs_autoincrement=True
1646            )
1647            name = Column("name", String(50))
1648
1649        User.a = Column("a", String(10))
1650        User.b = Column(String(10))
1651        Base.metadata.create_all()
1652        u1 = User(name="u1", a="a", b="b")
1653        eq_(u1.a, "a")
1654        eq_(User.a.get_history(u1), (["a"], (), ()))
1655        sess = create_session()
1656        sess.add(u1)
1657        sess.flush()
1658        sess.expunge_all()
1659        eq_(sess.query(User).all(), [User(name="u1", a="a", b="b")])
1660
1661    def test_column_properties(self):
1662        class Address(Base, fixtures.ComparableEntity):
1663
1664            __tablename__ = "addresses"
1665            id = Column(
1666                Integer, primary_key=True, test_needs_autoincrement=True
1667            )
1668            email = Column(String(50))
1669            user_id = Column(Integer, ForeignKey("users.id"))
1670
1671        class User(Base, fixtures.ComparableEntity):
1672
1673            __tablename__ = "users"
1674            id = Column(
1675                "id", Integer, primary_key=True, test_needs_autoincrement=True
1676            )
1677            name = Column("name", String(50))
1678
1679            adr_count = sa.orm.column_property(
1680                sa.select(
1681                    [sa.func.count(Address.id)], Address.user_id == id
1682                ).as_scalar()
1683            )
1684            addresses = relationship(Address)
1685
1686        Base.metadata.create_all()
1687        u1 = User(
1688            name="u1", addresses=[Address(email="one"), Address(email="two")]
1689        )
1690        sess = create_session()
1691        sess.add(u1)
1692        sess.flush()
1693        sess.expunge_all()
1694        eq_(
1695            sess.query(User).all(),
1696            [
1697                User(
1698                    name="u1",
1699                    adr_count=2,
1700                    addresses=[Address(email="one"), Address(email="two")],
1701                )
1702            ],
1703        )
1704
1705    def test_column_properties_2(self):
1706        class Address(Base, fixtures.ComparableEntity):
1707
1708            __tablename__ = "addresses"
1709            id = Column(Integer, primary_key=True)
1710            email = Column(String(50))
1711            user_id = Column(Integer, ForeignKey("users.id"))
1712
1713        class User(Base, fixtures.ComparableEntity):
1714
1715            __tablename__ = "users"
1716            id = Column("id", Integer, primary_key=True)
1717            name = Column("name", String(50))
1718
1719            # this is not "valid" but we want to test that Address.id
1720            # doesn't get stuck into user's table
1721
1722            adr_count = Address.id
1723
1724        eq_(set(User.__table__.c.keys()), set(["id", "name"]))
1725        eq_(set(Address.__table__.c.keys()), set(["id", "email", "user_id"]))
1726
1727    def test_deferred(self):
1728        class User(Base, fixtures.ComparableEntity):
1729
1730            __tablename__ = "users"
1731            id = Column(
1732                Integer, primary_key=True, test_needs_autoincrement=True
1733            )
1734            name = sa.orm.deferred(Column(String(50)))
1735
1736        Base.metadata.create_all()
1737        sess = create_session()
1738        sess.add(User(name="u1"))
1739        sess.flush()
1740        sess.expunge_all()
1741        u1 = sess.query(User).filter(User.name == "u1").one()
1742        assert "name" not in u1.__dict__
1743
1744        def go():
1745            eq_(u1.name, "u1")
1746
1747        self.assert_sql_count(testing.db, go, 1)
1748
1749    def test_composite_inline(self):
1750        class AddressComposite(fixtures.ComparableEntity):
1751            def __init__(self, street, state):
1752                self.street = street
1753                self.state = state
1754
1755            def __composite_values__(self):
1756                return [self.street, self.state]
1757
1758        class User(Base, fixtures.ComparableEntity):
1759            __tablename__ = "user"
1760            id = Column(
1761                Integer, primary_key=True, test_needs_autoincrement=True
1762            )
1763            address = composite(
1764                AddressComposite,
1765                Column("street", String(50)),
1766                Column("state", String(2)),
1767            )
1768
1769        Base.metadata.create_all()
1770        sess = Session()
1771        sess.add(User(address=AddressComposite("123 anywhere street", "MD")))
1772        sess.commit()
1773        eq_(
1774            sess.query(User).all(),
1775            [User(address=AddressComposite("123 anywhere street", "MD"))],
1776        )
1777
1778    def test_composite_separate(self):
1779        class AddressComposite(fixtures.ComparableEntity):
1780            def __init__(self, street, state):
1781                self.street = street
1782                self.state = state
1783
1784            def __composite_values__(self):
1785                return [self.street, self.state]
1786
1787        class User(Base, fixtures.ComparableEntity):
1788            __tablename__ = "user"
1789            id = Column(
1790                Integer, primary_key=True, test_needs_autoincrement=True
1791            )
1792            street = Column(String(50))
1793            state = Column(String(2))
1794            address = composite(AddressComposite, street, state)
1795
1796        Base.metadata.create_all()
1797        sess = Session()
1798        sess.add(User(address=AddressComposite("123 anywhere street", "MD")))
1799        sess.commit()
1800        eq_(
1801            sess.query(User).all(),
1802            [User(address=AddressComposite("123 anywhere street", "MD"))],
1803        )
1804
1805    def test_mapping_to_join(self):
1806        users = Table(
1807            "users", Base.metadata, Column("id", Integer, primary_key=True)
1808        )
1809        addresses = Table(
1810            "addresses",
1811            Base.metadata,
1812            Column("id", Integer, primary_key=True),
1813            Column("user_id", Integer, ForeignKey("users.id")),
1814        )
1815        usersaddresses = sa.join(
1816            users, addresses, users.c.id == addresses.c.user_id
1817        )
1818
1819        class User(Base):
1820            __table__ = usersaddresses
1821            __table_args__ = {"primary_key": [users.c.id]}
1822
1823            # need to use column_property for now
1824            user_id = column_property(users.c.id, addresses.c.user_id)
1825            address_id = addresses.c.id
1826
1827        assert User.__mapper__.get_property("user_id").columns[0] is users.c.id
1828        assert (
1829            User.__mapper__.get_property("user_id").columns[1]
1830            is addresses.c.user_id
1831        )
1832
1833    def test_synonym_inline(self):
1834        class User(Base, fixtures.ComparableEntity):
1835
1836            __tablename__ = "users"
1837            id = Column(
1838                "id", Integer, primary_key=True, test_needs_autoincrement=True
1839            )
1840            _name = Column("name", String(50))
1841
1842            def _set_name(self, name):
1843                self._name = "SOMENAME " + name
1844
1845            def _get_name(self):
1846                return self._name
1847
1848            name = sa.orm.synonym(
1849                "_name", descriptor=property(_get_name, _set_name)
1850            )
1851
1852        Base.metadata.create_all()
1853        sess = create_session()
1854        u1 = User(name="someuser")
1855        eq_(u1.name, "SOMENAME someuser")
1856        sess.add(u1)
1857        sess.flush()
1858        eq_(
1859            sess.query(User).filter(User.name == "SOMENAME someuser").one(), u1
1860        )
1861
1862    def test_synonym_no_descriptor(self):
1863        from sqlalchemy.orm.properties import ColumnProperty
1864
1865        class CustomCompare(ColumnProperty.Comparator):
1866
1867            __hash__ = None
1868
1869            def __eq__(self, other):
1870                return self.__clause_element__() == other + " FOO"
1871
1872        class User(Base, fixtures.ComparableEntity):
1873
1874            __tablename__ = "users"
1875            id = Column(
1876                "id", Integer, primary_key=True, test_needs_autoincrement=True
1877            )
1878            _name = Column("name", String(50))
1879            name = sa.orm.synonym("_name", comparator_factory=CustomCompare)
1880
1881        Base.metadata.create_all()
1882        sess = create_session()
1883        u1 = User(name="someuser FOO")
1884        sess.add(u1)
1885        sess.flush()
1886        eq_(sess.query(User).filter(User.name == "someuser").one(), u1)
1887
1888    def test_synonym_added(self):
1889        class User(Base, fixtures.ComparableEntity):
1890
1891            __tablename__ = "users"
1892            id = Column(
1893                "id", Integer, primary_key=True, test_needs_autoincrement=True
1894            )
1895            _name = Column("name", String(50))
1896
1897            def _set_name(self, name):
1898                self._name = "SOMENAME " + name
1899
1900            def _get_name(self):
1901                return self._name
1902
1903            name = property(_get_name, _set_name)
1904
1905        User.name = sa.orm.synonym("_name", descriptor=User.name)
1906        Base.metadata.create_all()
1907        sess = create_session()
1908        u1 = User(name="someuser")
1909        eq_(u1.name, "SOMENAME someuser")
1910        sess.add(u1)
1911        sess.flush()
1912        eq_(
1913            sess.query(User).filter(User.name == "SOMENAME someuser").one(), u1
1914        )
1915
1916    def test_reentrant_compile_via_foreignkey(self):
1917        class User(Base, fixtures.ComparableEntity):
1918
1919            __tablename__ = "users"
1920            id = Column(
1921                "id", Integer, primary_key=True, test_needs_autoincrement=True
1922            )
1923            name = Column("name", String(50))
1924            addresses = relationship("Address", backref="user")
1925
1926        class Address(Base, fixtures.ComparableEntity):
1927
1928            __tablename__ = "addresses"
1929            id = Column(
1930                "id", Integer, primary_key=True, test_needs_autoincrement=True
1931            )
1932            email = Column("email", String(50))
1933            user_id = Column("user_id", Integer, ForeignKey(User.id))
1934
1935        # previous versions would force a re-entrant mapper compile via
1936        # the User.id inside the ForeignKey but this is no longer the
1937        # case
1938
1939        sa.orm.configure_mappers()
1940        eq_(
1941            list(Address.user_id.property.columns[0].foreign_keys)[0].column,
1942            User.__table__.c.id,
1943        )
1944        Base.metadata.create_all()
1945        u1 = User(
1946            name="u1", addresses=[Address(email="one"), Address(email="two")]
1947        )
1948        sess = create_session()
1949        sess.add(u1)
1950        sess.flush()
1951        sess.expunge_all()
1952        eq_(
1953            sess.query(User).all(),
1954            [
1955                User(
1956                    name="u1",
1957                    addresses=[Address(email="one"), Address(email="two")],
1958                )
1959            ],
1960        )
1961
1962    def test_relationship_reference(self):
1963        class Address(Base, fixtures.ComparableEntity):
1964
1965            __tablename__ = "addresses"
1966            id = Column(
1967                "id", Integer, primary_key=True, test_needs_autoincrement=True
1968            )
1969            email = Column("email", String(50))
1970            user_id = Column("user_id", Integer, ForeignKey("users.id"))
1971
1972        class User(Base, fixtures.ComparableEntity):
1973
1974            __tablename__ = "users"
1975            id = Column(
1976                "id", Integer, primary_key=True, test_needs_autoincrement=True
1977            )
1978            name = Column("name", String(50))
1979            addresses = relationship(
1980                "Address", backref="user", primaryjoin=id == Address.user_id
1981            )
1982
1983        User.address_count = sa.orm.column_property(
1984            sa.select([sa.func.count(Address.id)])
1985            .where(Address.user_id == User.id)
1986            .as_scalar()
1987        )
1988        Base.metadata.create_all()
1989        u1 = User(
1990            name="u1", addresses=[Address(email="one"), Address(email="two")]
1991        )
1992        sess = create_session()
1993        sess.add(u1)
1994        sess.flush()
1995        sess.expunge_all()
1996        eq_(
1997            sess.query(User).all(),
1998            [
1999                User(
2000                    name="u1",
2001                    address_count=2,
2002                    addresses=[Address(email="one"), Address(email="two")],
2003                )
2004            ],
2005        )
2006
2007    def test_pk_with_fk_init(self):
2008        class Bar(Base):
2009
2010            __tablename__ = "bar"
2011            id = sa.Column(
2012                sa.Integer, sa.ForeignKey("foo.id"), primary_key=True
2013            )
2014            ex = sa.Column(sa.Integer, primary_key=True)
2015
2016        class Foo(Base):
2017
2018            __tablename__ = "foo"
2019            id = sa.Column(sa.Integer, primary_key=True)
2020            bars = sa.orm.relationship(Bar)
2021
2022        assert Bar.__mapper__.primary_key[0] is Bar.__table__.c.id
2023        assert Bar.__mapper__.primary_key[1] is Bar.__table__.c.ex
2024
2025    def test_with_explicit_autoloaded(self):
2026        meta = MetaData(testing.db)
2027        t1 = Table(
2028            "t1",
2029            meta,
2030            Column("id", String(50), primary_key=True),
2031            Column("data", String(50)),
2032        )
2033        meta.create_all()
2034        try:
2035
2036            class MyObj(Base):
2037
2038                __table__ = Table("t1", Base.metadata, autoload=True)
2039
2040            sess = create_session()
2041            m = MyObj(id="someid", data="somedata")
2042            sess.add(m)
2043            sess.flush()
2044            eq_(t1.select().execute().fetchall(), [("someid", "somedata")])
2045        finally:
2046            meta.drop_all()
2047
2048    def test_synonym_for(self):
2049        class User(Base, fixtures.ComparableEntity):
2050
2051            __tablename__ = "users"
2052            id = Column(
2053                "id", Integer, primary_key=True, test_needs_autoincrement=True
2054            )
2055            name = Column("name", String(50))
2056
2057            @decl.synonym_for("name")
2058            @property
2059            def namesyn(self):
2060                return self.name
2061
2062        Base.metadata.create_all()
2063        sess = create_session()
2064        u1 = User(name="someuser")
2065        eq_(u1.name, "someuser")
2066        eq_(u1.namesyn, "someuser")
2067        sess.add(u1)
2068        sess.flush()
2069        rt = sess.query(User).filter(User.namesyn == "someuser").one()
2070        eq_(rt, u1)
2071
2072    def test_duplicate_classes_in_base(self):
2073        class Test(Base):
2074            __tablename__ = "a"
2075            id = Column(Integer, primary_key=True)
2076
2077        assert_raises_message(
2078            sa.exc.SAWarning,
2079            "This declarative base already contains a class with ",
2080            lambda: type(Base)(
2081                "Test",
2082                (Base,),
2083                dict(__tablename__="b", id=Column(Integer, primary_key=True)),
2084            ),
2085        )
2086
2087    @testing.teardown_events(MapperEvents)
2088    def test_instrument_class_before_instrumentation(self):
2089        # test #3388
2090
2091        canary = mock.Mock()
2092
2093        @event.listens_for(mapper, "instrument_class")
2094        def instrument_class(mp, cls):
2095            canary.instrument_class(mp, cls)
2096
2097        @event.listens_for(object, "class_instrument")
2098        def class_instrument(cls):
2099            canary.class_instrument(cls)
2100
2101        class Test(Base):
2102            __tablename__ = "test"
2103            id = Column(Integer, primary_key=True)
2104
2105        eq_(
2106            canary.mock_calls,
2107            [
2108                mock.call.instrument_class(Test.__mapper__, Test),
2109                mock.call.class_instrument(Test),
2110            ],
2111        )
2112
2113    def test_cls_docstring(self):
2114        class MyBase(object):
2115            """MyBase Docstring"""
2116
2117        Base = decl.declarative_base(cls=MyBase)
2118
2119        eq_(Base.__doc__, MyBase.__doc__)
2120
2121    def test_delattr_mapped_raises(self):
2122        Base = decl.declarative_base()
2123
2124        class Foo(Base):
2125            __tablename__ = "foo"
2126
2127            id = Column(Integer, primary_key=True)
2128            data = Column(String)
2129
2130        def go():
2131            del Foo.data
2132
2133        assert_raises_message(
2134            NotImplementedError,
2135            "Can't un-map individual mapped attributes on a mapped class.",
2136            go,
2137        )
2138
2139    def test_delattr_hybrid_fine(self):
2140        Base = decl.declarative_base()
2141
2142        class Foo(Base):
2143            __tablename__ = "foo"
2144
2145            id = Column(Integer, primary_key=True)
2146            data = Column(String)
2147
2148            @hybrid_property
2149            def data_hybrid(self):
2150                return self.data
2151
2152        assert "data_hybrid" in Foo.__mapper__.all_orm_descriptors.keys()
2153
2154        del Foo.data_hybrid
2155
2156        assert "data_hybrid" not in Foo.__mapper__.all_orm_descriptors.keys()
2157
2158        assert not hasattr(Foo, "data_hybrid")
2159
2160    def test_setattr_hybrid_updates_descriptors(self):
2161        Base = decl.declarative_base()
2162
2163        class Foo(Base):
2164            __tablename__ = "foo"
2165
2166            id = Column(Integer, primary_key=True)
2167            data = Column(String)
2168
2169        assert "data_hybrid" not in Foo.__mapper__.all_orm_descriptors.keys()
2170
2171        @hybrid_property
2172        def data_hybrid(self):
2173            return self.data
2174
2175        Foo.data_hybrid = data_hybrid
2176        assert "data_hybrid" in Foo.__mapper__.all_orm_descriptors.keys()
2177
2178        del Foo.data_hybrid
2179
2180        assert "data_hybrid" not in Foo.__mapper__.all_orm_descriptors.keys()
2181
2182        assert not hasattr(Foo, "data_hybrid")
2183
2184    @testing.requires.python36
2185    def test_kw_support_in_declarative_meta_init(self):
2186        # This will not fail if DeclarativeMeta __init__ supports **kw
2187
2188        class BaseUser(Base):
2189            __tablename__ = "base"
2190            id_ = Column(Integer, primary_key=True)
2191
2192            @classmethod
2193            def __init_subclass__(cls, random_keyword=False, **kw):
2194                super().__init_subclass__(**kw)
2195                cls._set_random_keyword_used_here = random_keyword
2196
2197        class User(BaseUser):
2198            __tablename__ = "user"
2199            id_ = Column(Integer, ForeignKey("base.id_"), primary_key=True)
2200
2201        # Check the default option
2202        eq_(User._set_random_keyword_used_here, False)
2203
2204        # Build the metaclass with a keyword!
2205        bases = (BaseUser,)
2206        UserType = DeclarativeMeta("UserType", bases, {}, random_keyword=True)
2207
2208        # Check to see if __init_subclass__ works in supported versions
2209        eq_(UserType._set_random_keyword_used_here, True)
2210
2211
2212def _produce_test(inline, stringbased):
2213    class ExplicitJoinTest(fixtures.MappedTest):
2214        @classmethod
2215        def define_tables(cls, metadata):
2216            global User, Address
2217            Base = decl.declarative_base(metadata=metadata)
2218
2219            class User(Base, fixtures.ComparableEntity):
2220
2221                __tablename__ = "users"
2222                id = Column(
2223                    Integer, primary_key=True, test_needs_autoincrement=True
2224                )
2225                name = Column(String(50))
2226
2227            class Address(Base, fixtures.ComparableEntity):
2228
2229                __tablename__ = "addresses"
2230                id = Column(
2231                    Integer, primary_key=True, test_needs_autoincrement=True
2232                )
2233                email = Column(String(50))
2234                user_id = Column(Integer, ForeignKey("users.id"))
2235                if inline:
2236                    if stringbased:
2237                        user = relationship(
2238                            "User",
2239                            primaryjoin="User.id==Address.user_id",
2240                            backref="addresses",
2241                        )
2242                    else:
2243                        user = relationship(
2244                            User,
2245                            primaryjoin=User.id == user_id,
2246                            backref="addresses",
2247                        )
2248
2249            if not inline:
2250                configure_mappers()
2251                if stringbased:
2252                    Address.user = relationship(
2253                        "User",
2254                        primaryjoin="User.id==Address.user_id",
2255                        backref="addresses",
2256                    )
2257                else:
2258                    Address.user = relationship(
2259                        User,
2260                        primaryjoin=User.id == Address.user_id,
2261                        backref="addresses",
2262                    )
2263
2264        @classmethod
2265        def insert_data(cls, connection):
2266            params = [
2267                dict(list(zip(("id", "name"), column_values)))
2268                for column_values in [
2269                    (7, "jack"),
2270                    (8, "ed"),
2271                    (9, "fred"),
2272                    (10, "chuck"),
2273                ]
2274            ]
2275
2276            connection.execute(User.__table__.insert(), params)
2277            connection.execute(
2278                Address.__table__.insert(),
2279                [
2280                    dict(list(zip(("id", "user_id", "email"), column_values)))
2281                    for column_values in [
2282                        (1, 7, "jack@bean.com"),
2283                        (2, 8, "ed@wood.com"),
2284                        (3, 8, "ed@bettyboop.com"),
2285                        (4, 8, "ed@lala.com"),
2286                        (5, 9, "fred@fred.com"),
2287                    ]
2288                ],
2289            )
2290
2291        def test_aliased_join(self):
2292
2293            # this query will screw up if the aliasing enabled in
2294            # query.join() gets applied to the right half of the join
2295            # condition inside the any(). the join condition inside of
2296            # any() comes from the "primaryjoin" of the relationship,
2297            # and should not be annotated with _orm_adapt.
2298            # PropertyLoader.Comparator will annotate the left side with
2299            # _orm_adapt, though.
2300
2301            sess = create_session()
2302            eq_(
2303                sess.query(User)
2304                .join(User.addresses, aliased=True)
2305                .filter(Address.email == "ed@wood.com")
2306                .filter(User.addresses.any(Address.email == "jack@bean.com"))
2307                .all(),
2308                [],
2309            )
2310
2311    ExplicitJoinTest.__name__ = "ExplicitJoinTest%s%s" % (
2312        inline and "Inline" or "Separate",
2313        stringbased and "String" or "Literal",
2314    )
2315    return ExplicitJoinTest
2316
2317
2318for inline in True, False:
2319    for stringbased in True, False:
2320        testclass = _produce_test(inline, stringbased)
2321        exec("%s = testclass" % testclass.__name__)
2322        del testclass
2323