1from sqlalchemy import Integer, String, ForeignKey, func, desc, and_, or_
2from sqlalchemy.orm import interfaces, relationship, mapper, \
3    clear_mappers, create_session, joinedload, joinedload_all, \
4    subqueryload, subqueryload_all, polymorphic_union, aliased,\
5    class_mapper
6from sqlalchemy import exc as sa_exc
7from sqlalchemy.engine import default
8
9from sqlalchemy.testing import AssertsCompiledSQL, fixtures
10from sqlalchemy import testing
11from sqlalchemy.testing.schema import Table, Column
12from sqlalchemy.testing import assert_raises, eq_
13
14class Company(fixtures.ComparableEntity):
15    pass
16class Person(fixtures.ComparableEntity):
17    pass
18class Engineer(Person):
19    pass
20class Manager(Person):
21    pass
22class Boss(Manager):
23    pass
24class Machine(fixtures.ComparableEntity):
25    pass
26class MachineType(fixtures.ComparableEntity):
27    pass
28class Paperwork(fixtures.ComparableEntity):
29    pass
30
31
32class _PolymorphicFixtureBase(fixtures.MappedTest, AssertsCompiledSQL):
33    run_inserts = 'once'
34    run_setup_mappers = 'once'
35    run_deletes = None
36
37    @classmethod
38    def define_tables(cls, metadata):
39        global people, engineers, managers, boss
40        global companies, paperwork, machines
41
42        companies = Table('companies', metadata,
43            Column('company_id', Integer,
44                primary_key=True,
45                test_needs_autoincrement=True),
46            Column('name', String(50)))
47
48        people = Table('people', metadata,
49            Column('person_id', Integer,
50                primary_key=True,
51                test_needs_autoincrement=True),
52            Column('company_id', Integer,
53                ForeignKey('companies.company_id')),
54            Column('name', String(50)),
55            Column('type', String(30)))
56
57        engineers = Table('engineers', metadata,
58            Column('person_id', Integer,
59                ForeignKey('people.person_id'),
60                primary_key=True),
61            Column('status', String(30)),
62            Column('engineer_name', String(50)),
63            Column('primary_language', String(50)))
64
65        machines = Table('machines', metadata,
66             Column('machine_id',
67                 Integer, primary_key=True,
68                 test_needs_autoincrement=True),
69             Column('name', String(50)),
70             Column('engineer_id', Integer,
71            ForeignKey('engineers.person_id')))
72
73        managers = Table('managers', metadata,
74            Column('person_id', Integer,
75                ForeignKey('people.person_id'),
76                primary_key=True),
77            Column('status', String(30)),
78            Column('manager_name', String(50)))
79
80        boss = Table('boss', metadata,
81            Column('boss_id', Integer,
82                ForeignKey('managers.person_id'),
83                primary_key=True),
84            Column('golf_swing', String(30)))
85
86        paperwork = Table('paperwork', metadata,
87            Column('paperwork_id', Integer,
88                primary_key=True,
89                test_needs_autoincrement=True),
90            Column('description', String(50)),
91            Column('person_id', Integer,
92                ForeignKey('people.person_id')))
93
94    @classmethod
95    def insert_data(cls):
96
97        cls.e1 = e1 = Engineer(
98            name="dilbert",
99            engineer_name="dilbert",
100            primary_language="java",
101            status="regular engineer",
102            paperwork=[
103                Paperwork(description="tps report #1"),
104                Paperwork(description="tps report #2")],
105            machines=[
106                Machine(name='IBM ThinkPad'),
107                Machine(name='IPhone')])
108
109        cls.e2 = e2 = Engineer(
110            name="wally",
111            engineer_name="wally",
112            primary_language="c++",
113            status="regular engineer",
114            paperwork=[
115                Paperwork(description="tps report #3"),
116                Paperwork(description="tps report #4")],
117            machines=[Machine(name="Commodore 64")])
118
119        cls.b1 = b1 = Boss(
120            name="pointy haired boss",
121            golf_swing="fore",
122            manager_name="pointy",
123            status="da boss",
124            paperwork=[Paperwork(description="review #1")])
125
126        cls.m1 = m1 = Manager(
127            name="dogbert",
128            manager_name="dogbert",
129            status="regular manager",
130            paperwork=[
131                Paperwork(description="review #2"),
132                Paperwork(description="review #3")])
133
134        cls.e3 = e3 = Engineer(
135            name="vlad",
136            engineer_name="vlad",
137            primary_language="cobol",
138            status="elbonian engineer",
139            paperwork=[
140                Paperwork(description='elbonian missive #3')],
141            machines=[
142                Machine(name="Commodore 64"),
143                Machine(name="IBM 3270")])
144
145        cls.c1 = c1 = Company(name="MegaCorp, Inc.")
146        c1.employees = [e1, e2, b1, m1]
147        cls.c2 = c2 = Company(name="Elbonia, Inc.")
148        c2.employees = [e3]
149
150        sess = create_session()
151        sess.add(c1)
152        sess.add(c2)
153        sess.flush()
154        sess.expunge_all()
155
156        cls.all_employees = [e1, e2, b1, m1, e3]
157        cls.c1_employees = [e1, e2, b1, m1]
158        cls.c2_employees = [e3]
159
160    def _company_with_emps_machines_fixture(self):
161        fixture = self._company_with_emps_fixture()
162        fixture[0].employees[0].machines = [
163            Machine(name="IBM ThinkPad"),
164            Machine(name="IPhone"),
165        ]
166        fixture[0].employees[1].machines = [
167            Machine(name="Commodore 64")
168        ]
169        return fixture
170
171    def _company_with_emps_fixture(self):
172        return [
173            Company(
174                name="MegaCorp, Inc.",
175                employees=[
176                    Engineer(
177                        name="dilbert",
178                        engineer_name="dilbert",
179                        primary_language="java",
180                        status="regular engineer"
181                    ),
182                    Engineer(
183                        name="wally",
184                        engineer_name="wally",
185                        primary_language="c++",
186                        status="regular engineer"),
187                    Boss(
188                        name="pointy haired boss",
189                        golf_swing="fore",
190                        manager_name="pointy",
191                        status="da boss"),
192                    Manager(
193                        name="dogbert",
194                        manager_name="dogbert",
195                        status="regular manager"),
196                ]),
197            Company(
198                name="Elbonia, Inc.",
199                employees=[
200                    Engineer(
201                        name="vlad",
202                        engineer_name="vlad",
203                        primary_language="cobol",
204                        status="elbonian engineer")
205                ])
206        ]
207
208    def _emps_wo_relationships_fixture(self):
209        return [
210            Engineer(
211                name="dilbert",
212                engineer_name="dilbert",
213                primary_language="java",
214                status="regular engineer"),
215            Engineer(
216                name="wally",
217                engineer_name="wally",
218                primary_language="c++",
219                status="regular engineer"),
220            Boss(
221                name="pointy haired boss",
222                golf_swing="fore",
223                manager_name="pointy",
224                status="da boss"),
225            Manager(
226                name="dogbert",
227                manager_name="dogbert",
228                status="regular manager"),
229            Engineer(
230                name="vlad",
231                engineer_name="vlad",
232                primary_language="cobol",
233                status="elbonian engineer")
234        ]
235
236    @classmethod
237    def setup_mappers(cls):
238        mapper(Company, companies,
239            properties={
240                'employees':relationship(
241                    Person,
242                    order_by=people.c.person_id)})
243
244        mapper(Machine, machines)
245
246        person_with_polymorphic,\
247            manager_with_polymorphic = cls._get_polymorphics()
248
249        mapper(Person, people,
250            with_polymorphic=person_with_polymorphic,
251            polymorphic_on=people.c.type,
252            polymorphic_identity='person',
253            order_by=people.c.person_id,
254            properties={
255                'paperwork':relationship(
256                    Paperwork,
257                    order_by=paperwork.c.paperwork_id)})
258
259        mapper(Engineer, engineers,
260            inherits=Person,
261            polymorphic_identity='engineer',
262            properties={
263                'machines':relationship(
264                    Machine,
265                    order_by=machines.c.machine_id)})
266
267        mapper(Manager, managers,
268            with_polymorphic=manager_with_polymorphic,
269            inherits=Person,
270            polymorphic_identity='manager')
271
272        mapper(Boss, boss,
273            inherits=Manager,
274            polymorphic_identity='boss')
275
276        mapper(Paperwork, paperwork)
277
278class _Polymorphic(_PolymorphicFixtureBase):
279    select_type = ""
280    @classmethod
281    def _get_polymorphics(cls):
282        return None, None
283
284class _PolymorphicPolymorphic(_PolymorphicFixtureBase):
285    select_type = "Polymorphic"
286    @classmethod
287    def _get_polymorphics(cls):
288        return '*', '*'
289
290
291class _PolymorphicUnions(_PolymorphicFixtureBase):
292    select_type = "Unions"
293
294    @classmethod
295    def _get_polymorphics(cls):
296        people, engineers, managers, boss = \
297            cls.tables.people, cls.tables.engineers, \
298            cls.tables.managers, cls.tables.boss
299        person_join = polymorphic_union({
300                'engineer':people.join(engineers),
301                'manager':people.join(managers)},
302            None, 'pjoin')
303        manager_join = people.join(managers).outerjoin(boss)
304        person_with_polymorphic = (
305            [Person, Manager, Engineer], person_join)
306        manager_with_polymorphic = ('*', manager_join)
307        return person_with_polymorphic,\
308            manager_with_polymorphic
309
310
311class _PolymorphicAliasedJoins(_PolymorphicFixtureBase):
312    select_type = "AliasedJoins"
313
314    @classmethod
315    def _get_polymorphics(cls):
316        people, engineers, managers, boss = \
317            cls.tables.people, cls.tables.engineers, \
318            cls.tables.managers, cls.tables.boss
319        person_join = people \
320            .outerjoin(engineers) \
321            .outerjoin(managers) \
322            .select(use_labels=True) \
323            .alias('pjoin')
324        manager_join = people \
325            .join(managers) \
326            .outerjoin(boss) \
327            .select(use_labels=True) \
328            .alias('mjoin')
329        person_with_polymorphic = (
330            [Person, Manager, Engineer], person_join)
331        manager_with_polymorphic = ('*', manager_join)
332        return person_with_polymorphic,\
333            manager_with_polymorphic
334
335
336class _PolymorphicJoins(_PolymorphicFixtureBase):
337    select_type = "Joins"
338
339    @classmethod
340    def _get_polymorphics(cls):
341        people, engineers, managers, boss = \
342            cls.tables.people, cls.tables.engineers, \
343            cls.tables.managers, cls.tables.boss
344        person_join = people.outerjoin(engineers).outerjoin(managers)
345        manager_join = people.join(managers).outerjoin(boss)
346        person_with_polymorphic = (
347            [Person, Manager, Engineer], person_join)
348        manager_with_polymorphic = ('*', manager_join)
349        return person_with_polymorphic,\
350            manager_with_polymorphic
351