1from sqlalchemy import ForeignKey
2from sqlalchemy import inspect
3from sqlalchemy import Integer
4from sqlalchemy import String
5from sqlalchemy import testing
6from sqlalchemy.orm import Bundle
7from sqlalchemy.orm import configure_mappers
8from sqlalchemy.orm import defaultload
9from sqlalchemy.orm import defer
10from sqlalchemy.orm import joinedload
11from sqlalchemy.orm import Load
12from sqlalchemy.orm import mapper
13from sqlalchemy.orm import relationship
14from sqlalchemy.orm import selectinload
15from sqlalchemy.orm import Session
16from sqlalchemy.orm import sessionmaker
17from sqlalchemy.testing import fixtures
18from sqlalchemy.testing import profiling
19from sqlalchemy.testing.schema import Column
20from sqlalchemy.testing.schema import Table
21
22
23class MergeTest(fixtures.MappedTest):
24    @classmethod
25    def define_tables(cls, metadata):
26        Table(
27            "parent",
28            metadata,
29            Column(
30                "id", Integer, primary_key=True, test_needs_autoincrement=True
31            ),
32            Column("data", String(20)),
33        )
34        Table(
35            "child",
36            metadata,
37            Column(
38                "id", Integer, primary_key=True, test_needs_autoincrement=True
39            ),
40            Column("data", String(20)),
41            Column(
42                "parent_id", Integer, ForeignKey("parent.id"), nullable=False
43            ),
44        )
45
46    @classmethod
47    def setup_classes(cls):
48        class Parent(cls.Basic):
49            pass
50
51        class Child(cls.Basic):
52            pass
53
54    @classmethod
55    def setup_mappers(cls):
56        Child, Parent, parent, child = (
57            cls.classes.Child,
58            cls.classes.Parent,
59            cls.tables.parent,
60            cls.tables.child,
61        )
62
63        mapper(
64            Parent,
65            parent,
66            properties={"children": relationship(Child, backref="parent")},
67        )
68        mapper(Child, child)
69
70    @classmethod
71    def insert_data(cls):
72        parent, child = cls.tables.parent, cls.tables.child
73
74        parent.insert().execute({"id": 1, "data": "p1"})
75        child.insert().execute({"id": 1, "data": "p1c1", "parent_id": 1})
76
77    def test_merge_no_load(self):
78        Parent = self.classes.Parent
79
80        sess = sessionmaker()()
81        sess2 = sessionmaker()()
82        p1 = sess.query(Parent).get(1)
83        p1.children
84
85        # down from 185 on this this is a small slice of a usually
86        # bigger operation so using a small variance
87
88        @profiling.function_call_count(variance=0.10)
89        def go1():
90            return sess2.merge(p1, load=False)
91
92        p2 = go1()
93
94        # third call, merge object already present. almost no calls.
95
96        @profiling.function_call_count(variance=0.10)
97        def go2():
98            return sess2.merge(p2, load=False)
99
100        go2()
101
102    def test_merge_load(self):
103        Parent = self.classes.Parent
104
105        sess = sessionmaker()()
106        sess2 = sessionmaker()()
107        p1 = sess.query(Parent).get(1)
108        p1.children
109
110        # preloading of collection took this down from 1728 to 1192
111        # using sqlite3 the C extension took it back up to approx. 1257
112        # (py2.6)
113
114        @profiling.function_call_count()
115        def go():
116            sess2.merge(p1)
117
118        go()
119
120        # one more time, count the SQL
121
122        def go2():
123            sess2.merge(p1)
124
125        sess2 = sessionmaker(testing.db)()
126        self.assert_sql_count(testing.db, go2, 2)
127
128
129class LoadManyToOneFromIdentityTest(fixtures.MappedTest):
130
131    """test overhead associated with many-to-one fetches.
132
133    Prior to the refactor of LoadLazyAttribute and
134    query._get(), the load from identity map took 2x
135    as many calls (65K calls here instead of around 33K)
136    to load 1000 related objects from the identity map.
137
138    """
139
140    @classmethod
141    def define_tables(cls, metadata):
142        Table(
143            "parent",
144            metadata,
145            Column("id", Integer, primary_key=True),
146            Column("data", String(20)),
147            Column("child_id", Integer, ForeignKey("child.id")),
148        )
149
150        Table(
151            "child",
152            metadata,
153            Column("id", Integer, primary_key=True),
154            Column("data", String(20)),
155        )
156
157    @classmethod
158    def setup_classes(cls):
159        class Parent(cls.Basic):
160            pass
161
162        class Child(cls.Basic):
163            pass
164
165    @classmethod
166    def setup_mappers(cls):
167        Child, Parent, parent, child = (
168            cls.classes.Child,
169            cls.classes.Parent,
170            cls.tables.parent,
171            cls.tables.child,
172        )
173
174        mapper(Parent, parent, properties={"child": relationship(Child)})
175        mapper(Child, child)
176
177    @classmethod
178    def insert_data(cls):
179        parent, child = cls.tables.parent, cls.tables.child
180
181        child.insert().execute(
182            [{"id": i, "data": "c%d" % i} for i in range(1, 251)]
183        )
184        parent.insert().execute(
185            [
186                {
187                    "id": i,
188                    "data": "p%dc%d" % (i, (i % 250) + 1),
189                    "child_id": (i % 250) + 1,
190                }
191                for i in range(1, 1000)
192            ]
193        )
194
195    def test_many_to_one_load_no_identity(self):
196        Parent = self.classes.Parent
197
198        sess = Session()
199        parents = sess.query(Parent).all()
200
201        @profiling.function_call_count(variance=0.2)
202        def go():
203            for p in parents:
204                p.child
205
206        go()
207
208    def test_many_to_one_load_identity(self):
209        Parent, Child = self.classes.Parent, self.classes.Child
210
211        sess = Session()
212        parents = sess.query(Parent).all()
213        children = sess.query(Child).all()
214        children  # strong reference
215
216        @profiling.function_call_count()
217        def go():
218            for p in parents:
219                p.child
220
221        go()
222
223
224class MergeBackrefsTest(fixtures.MappedTest):
225    @classmethod
226    def define_tables(cls, metadata):
227        Table(
228            "a",
229            metadata,
230            Column("id", Integer, primary_key=True),
231            Column("c_id", Integer, ForeignKey("c.id")),
232        )
233        Table(
234            "b",
235            metadata,
236            Column("id", Integer, primary_key=True),
237            Column("a_id", Integer, ForeignKey("a.id")),
238        )
239        Table("c", metadata, Column("id", Integer, primary_key=True))
240        Table(
241            "d",
242            metadata,
243            Column("id", Integer, primary_key=True),
244            Column("a_id", Integer, ForeignKey("a.id")),
245        )
246
247    @classmethod
248    def setup_classes(cls):
249        class A(cls.Basic):
250            pass
251
252        class B(cls.Basic):
253            pass
254
255        class C(cls.Basic):
256            pass
257
258        class D(cls.Basic):
259            pass
260
261    @classmethod
262    def setup_mappers(cls):
263        A, B, C, D = cls.classes.A, cls.classes.B, cls.classes.C, cls.classes.D
264        a, b, c, d = cls.tables.a, cls.tables.b, cls.tables.c, cls.tables.d
265        mapper(
266            A,
267            a,
268            properties={
269                "bs": relationship(B, backref="a"),
270                "c": relationship(C, backref="as"),
271                "ds": relationship(D, backref="a"),
272            },
273        )
274        mapper(B, b)
275        mapper(C, c)
276        mapper(D, d)
277
278    @classmethod
279    def insert_data(cls):
280        A, B, C, D = cls.classes.A, cls.classes.B, cls.classes.C, cls.classes.D
281        s = Session()
282        s.add_all(
283            [
284                A(
285                    id=i,
286                    bs=[B(id=(i * 5) + j) for j in range(1, 5)],
287                    c=C(id=i),
288                    ds=[D(id=(i * 5) + j) for j in range(1, 5)],
289                )
290                for i in range(1, 5)
291            ]
292        )
293        s.commit()
294
295    @profiling.function_call_count(variance=0.10)
296    def test_merge_pending_with_all_pks(self):
297        A, B, C, D = (
298            self.classes.A,
299            self.classes.B,
300            self.classes.C,
301            self.classes.D,
302        )
303        s = Session()
304        for a in [
305            A(
306                id=i,
307                bs=[B(id=(i * 5) + j) for j in range(1, 5)],
308                c=C(id=i),
309                ds=[D(id=(i * 5) + j) for j in range(1, 5)],
310            )
311            for i in range(1, 5)
312        ]:
313            s.merge(a)
314
315
316class DeferOptionsTest(fixtures.MappedTest):
317    @classmethod
318    def define_tables(cls, metadata):
319        Table(
320            "a",
321            metadata,
322            Column("id", Integer, primary_key=True),
323            Column("x", String(5)),
324            Column("y", String(5)),
325            Column("z", String(5)),
326            Column("q", String(5)),
327            Column("p", String(5)),
328            Column("r", String(5)),
329        )
330
331    @classmethod
332    def setup_classes(cls):
333        class A(cls.Basic):
334            pass
335
336    @classmethod
337    def setup_mappers(cls):
338        A = cls.classes.A
339        a = cls.tables.a
340        mapper(A, a)
341
342    @classmethod
343    def insert_data(cls):
344        A = cls.classes.A
345        s = Session()
346        s.add_all(
347            [
348                A(
349                    id=i,
350                    **dict(
351                        (letter, "%s%d" % (letter, i))
352                        for letter in ["x", "y", "z", "p", "q", "r"]
353                    )
354                )
355                for i in range(1, 1001)
356            ]
357        )
358        s.commit()
359
360    @profiling.function_call_count(variance=0.10)
361    def test_baseline(self):
362        # as of [ticket:2778], this is at 39025
363        A = self.classes.A
364        s = Session()
365        s.query(A).all()
366
367    @profiling.function_call_count(variance=0.10)
368    def test_defer_many_cols(self):
369        # with [ticket:2778], this goes from 50805 to 32817,
370        # as it should be fewer function calls than the baseline
371        A = self.classes.A
372        s = Session()
373        s.query(A).options(
374            *[defer(letter) for letter in ["x", "y", "z", "p", "q", "r"]]
375        ).all()
376
377
378class AttributeOverheadTest(fixtures.MappedTest):
379    @classmethod
380    def define_tables(cls, metadata):
381        Table(
382            "parent",
383            metadata,
384            Column(
385                "id", Integer, primary_key=True, test_needs_autoincrement=True
386            ),
387            Column("data", String(20)),
388        )
389        Table(
390            "child",
391            metadata,
392            Column(
393                "id", Integer, primary_key=True, test_needs_autoincrement=True
394            ),
395            Column("data", String(20)),
396            Column(
397                "parent_id", Integer, ForeignKey("parent.id"), nullable=False
398            ),
399        )
400
401    @classmethod
402    def setup_classes(cls):
403        class Parent(cls.Basic):
404            pass
405
406        class Child(cls.Basic):
407            pass
408
409    @classmethod
410    def setup_mappers(cls):
411        Child, Parent, parent, child = (
412            cls.classes.Child,
413            cls.classes.Parent,
414            cls.tables.parent,
415            cls.tables.child,
416        )
417
418        mapper(
419            Parent,
420            parent,
421            properties={"children": relationship(Child, backref="parent")},
422        )
423        mapper(Child, child)
424
425    def test_attribute_set(self):
426        Parent, Child = self.classes.Parent, self.classes.Child
427        p1 = Parent()
428        c1 = Child()
429
430        @profiling.function_call_count()
431        def go():
432            for i in range(30):
433                c1.parent = p1
434                c1.parent = None
435                c1.parent = p1
436                del c1.parent
437
438        go()
439
440    def test_collection_append_remove(self):
441        Parent, Child = self.classes.Parent, self.classes.Child
442        p1 = Parent()
443        children = [Child() for i in range(100)]
444
445        @profiling.function_call_count()
446        def go():
447            for child in children:
448                p1.children.append(child)
449            for child in children:
450                p1.children.remove(child)
451
452        go()
453
454
455class SessionTest(fixtures.MappedTest):
456    @classmethod
457    def define_tables(cls, metadata):
458        Table(
459            "parent",
460            metadata,
461            Column(
462                "id", Integer, primary_key=True, test_needs_autoincrement=True
463            ),
464            Column("data", String(20)),
465        )
466        Table(
467            "child",
468            metadata,
469            Column(
470                "id", Integer, primary_key=True, test_needs_autoincrement=True
471            ),
472            Column("data", String(20)),
473            Column(
474                "parent_id", Integer, ForeignKey("parent.id"), nullable=False
475            ),
476        )
477
478    @classmethod
479    def setup_classes(cls):
480        class Parent(cls.Basic):
481            pass
482
483        class Child(cls.Basic):
484            pass
485
486    @classmethod
487    def setup_mappers(cls):
488        Child, Parent, parent, child = (
489            cls.classes.Child,
490            cls.classes.Parent,
491            cls.tables.parent,
492            cls.tables.child,
493        )
494
495        mapper(
496            Parent,
497            parent,
498            properties={"children": relationship(Child, backref="parent")},
499        )
500        mapper(Child, child)
501
502    def test_expire_lots(self):
503        Parent, Child = self.classes.Parent, self.classes.Child
504        obj = [
505            Parent(children=[Child() for j in range(10)]) for i in range(10)
506        ]
507
508        sess = Session()
509        sess.add_all(obj)
510        sess.flush()
511
512        @profiling.function_call_count()
513        def go():
514            sess.expire_all()
515
516        go()
517
518
519class QueryTest(fixtures.MappedTest):
520    @classmethod
521    def define_tables(cls, metadata):
522        Table(
523            "parent",
524            metadata,
525            Column(
526                "id", Integer, primary_key=True, test_needs_autoincrement=True
527            ),
528            Column("data1", String(20)),
529            Column("data2", String(20)),
530            Column("data3", String(20)),
531            Column("data4", String(20)),
532        )
533
534    @classmethod
535    def setup_classes(cls):
536        class Parent(cls.Basic):
537            pass
538
539    @classmethod
540    def setup_mappers(cls):
541        Parent = cls.classes.Parent
542        parent = cls.tables.parent
543
544        mapper(Parent, parent)
545
546    def _fixture(self):
547        Parent = self.classes.Parent
548        sess = Session()
549        sess.add_all(
550            [
551                Parent(data1="d1", data2="d2", data3="d3", data4="d4")
552                for i in range(10)
553            ]
554        )
555        sess.commit()
556        sess.close()
557
558    def test_query_cols(self):
559        Parent = self.classes.Parent
560        self._fixture()
561        sess = Session()
562
563        @profiling.function_call_count()
564        def go():
565            for i in range(10):
566                q = sess.query(
567                    Parent.data1, Parent.data2, Parent.data3, Parent.data4
568                )
569
570                q.all()
571
572        go()
573
574
575class SelectInEagerLoadTest(fixtures.MappedTest):
576    """basic test for selectin() loading, which uses a baked query.
577
578    if the baked query starts spoiling due to some bug in cache keys,
579    this callcount blows up.
580
581    """
582
583    @classmethod
584    def define_tables(cls, metadata):
585
586        Table(
587            "a",
588            metadata,
589            Column(
590                "id", Integer, primary_key=True, test_needs_autoincrement=True
591            ),
592            Column("x", Integer),
593            Column("y", Integer),
594        )
595        Table(
596            "b",
597            metadata,
598            Column(
599                "id", Integer, primary_key=True, test_needs_autoincrement=True
600            ),
601            Column("a_id", ForeignKey("a.id")),
602            Column("x", Integer),
603            Column("y", Integer),
604        )
605        Table(
606            "c",
607            metadata,
608            Column(
609                "id", Integer, primary_key=True, test_needs_autoincrement=True
610            ),
611            Column("b_id", ForeignKey("b.id")),
612            Column("x", Integer),
613            Column("y", Integer),
614        )
615
616    @classmethod
617    def setup_classes(cls):
618        class A(cls.Basic):
619            pass
620
621        class B(cls.Basic):
622            pass
623
624        class C(cls.Basic):
625            pass
626
627    @classmethod
628    def setup_mappers(cls):
629        A, B, C = cls.classes("A", "B", "C")
630        a, b, c = cls.tables("a", "b", "c")
631
632        mapper(A, a, properties={"bs": relationship(B)})
633        mapper(B, b, properties={"cs": relationship(C)})
634        mapper(C, c)
635
636    @classmethod
637    def insert_data(cls):
638        A, B, C = cls.classes("A", "B", "C")
639        s = Session()
640        s.add(A(bs=[B(cs=[C()]), B(cs=[C()])]))
641        s.commit()
642
643    def test_round_trip_results(self):
644        A, B, C = self.classes("A", "B", "C")
645
646        sess = Session()
647
648        q = sess.query(A).options(selectinload(A.bs).selectinload(B.cs))
649
650        @profiling.function_call_count()
651        def go():
652            for i in range(100):
653                obj = q.all()
654                list(obj)
655                sess.close()
656
657        go()
658
659
660class JoinedEagerLoadTest(fixtures.MappedTest):
661    @classmethod
662    def define_tables(cls, metadata):
663        def make_some_columns():
664            return [Column("c%d" % i, Integer) for i in range(10)]
665
666        Table(
667            "a",
668            metadata,
669            Column(
670                "id", Integer, primary_key=True, test_needs_autoincrement=True
671            ),
672            *make_some_columns()
673        )
674        Table(
675            "b",
676            metadata,
677            Column(
678                "id", Integer, primary_key=True, test_needs_autoincrement=True
679            ),
680            Column("a_id", ForeignKey("a.id")),
681            *make_some_columns()
682        )
683        Table(
684            "c",
685            metadata,
686            Column(
687                "id", Integer, primary_key=True, test_needs_autoincrement=True
688            ),
689            Column("b_id", ForeignKey("b.id")),
690            *make_some_columns()
691        )
692        Table(
693            "d",
694            metadata,
695            Column(
696                "id", Integer, primary_key=True, test_needs_autoincrement=True
697            ),
698            Column("c_id", ForeignKey("c.id")),
699            *make_some_columns()
700        )
701        Table(
702            "e",
703            metadata,
704            Column(
705                "id", Integer, primary_key=True, test_needs_autoincrement=True
706            ),
707            Column("a_id", ForeignKey("a.id")),
708            *make_some_columns()
709        )
710        Table(
711            "f",
712            metadata,
713            Column(
714                "id", Integer, primary_key=True, test_needs_autoincrement=True
715            ),
716            Column("e_id", ForeignKey("e.id")),
717            *make_some_columns()
718        )
719        Table(
720            "g",
721            metadata,
722            Column(
723                "id", Integer, primary_key=True, test_needs_autoincrement=True
724            ),
725            Column("e_id", ForeignKey("e.id")),
726            *make_some_columns()
727        )
728
729    @classmethod
730    def setup_classes(cls):
731        class A(cls.Basic):
732            pass
733
734        class B(cls.Basic):
735            pass
736
737        class C(cls.Basic):
738            pass
739
740        class D(cls.Basic):
741            pass
742
743        class E(cls.Basic):
744            pass
745
746        class F(cls.Basic):
747            pass
748
749        class G(cls.Basic):
750            pass
751
752    @classmethod
753    def setup_mappers(cls):
754        A, B, C, D, E, F, G = cls.classes("A", "B", "C", "D", "E", "F", "G")
755        a, b, c, d, e, f, g = cls.tables("a", "b", "c", "d", "e", "f", "g")
756
757        mapper(A, a, properties={"bs": relationship(B), "es": relationship(E)})
758        mapper(B, b, properties={"cs": relationship(C)})
759        mapper(C, c, properties={"ds": relationship(D)})
760        mapper(D, d)
761        mapper(E, e, properties={"fs": relationship(F), "gs": relationship(G)})
762        mapper(F, f)
763        mapper(G, g)
764
765    @classmethod
766    def insert_data(cls):
767        A, B, C, D, E, F, G = cls.classes("A", "B", "C", "D", "E", "F", "G")
768        s = Session()
769        s.add(
770            A(
771                bs=[B(cs=[C(ds=[D()])]), B(cs=[C()])],
772                es=[E(fs=[F()], gs=[G()])],
773            )
774        )
775        s.commit()
776
777    def test_build_query(self):
778        A, B, C, D, E, F, G = self.classes("A", "B", "C", "D", "E", "F", "G")
779
780        sess = Session()
781
782        @profiling.function_call_count()
783        def go():
784            for i in range(100):
785                q = sess.query(A).options(
786                    joinedload(A.bs).joinedload(B.cs).joinedload(C.ds),
787                    joinedload(A.es).joinedload(E.fs),
788                    defaultload(A.es).joinedload(E.gs),
789                )
790                q._compile_context()
791
792        go()
793
794    def test_fetch_results(self):
795        A, B, C, D, E, F, G = self.classes("A", "B", "C", "D", "E", "F", "G")
796
797        sess = Session()
798
799        q = sess.query(A).options(
800            joinedload(A.bs).joinedload(B.cs).joinedload(C.ds),
801            joinedload(A.es).joinedload(E.fs),
802            defaultload(A.es).joinedload(E.gs),
803        )
804
805        context = q._compile_context()
806
807        @profiling.function_call_count()
808        def go():
809            for i in range(100):
810                obj = q._execute_and_instances(context)
811                list(obj)
812                sess.close()
813
814        go()
815
816
817class BranchedOptionTest(fixtures.MappedTest):
818    @classmethod
819    def define_tables(cls, metadata):
820        def make_some_columns():
821            return [Column("c%d" % i, Integer) for i in range(2)]
822
823        Table(
824            "a",
825            metadata,
826            Column(
827                "id", Integer, primary_key=True, test_needs_autoincrement=True
828            ),
829            *make_some_columns()
830        )
831        Table(
832            "b",
833            metadata,
834            Column(
835                "id", Integer, primary_key=True, test_needs_autoincrement=True
836            ),
837            Column("a_id", ForeignKey("a.id")),
838            *make_some_columns()
839        )
840        Table(
841            "c",
842            metadata,
843            Column(
844                "id", Integer, primary_key=True, test_needs_autoincrement=True
845            ),
846            Column("b_id", ForeignKey("b.id")),
847            *make_some_columns()
848        )
849        Table(
850            "d",
851            metadata,
852            Column(
853                "id", Integer, primary_key=True, test_needs_autoincrement=True
854            ),
855            Column("b_id", ForeignKey("b.id")),
856            *make_some_columns()
857        )
858        Table(
859            "e",
860            metadata,
861            Column(
862                "id", Integer, primary_key=True, test_needs_autoincrement=True
863            ),
864            Column("b_id", ForeignKey("b.id")),
865            *make_some_columns()
866        )
867        Table(
868            "f",
869            metadata,
870            Column(
871                "id", Integer, primary_key=True, test_needs_autoincrement=True
872            ),
873            Column("b_id", ForeignKey("b.id")),
874            *make_some_columns()
875        )
876        Table(
877            "g",
878            metadata,
879            Column(
880                "id", Integer, primary_key=True, test_needs_autoincrement=True
881            ),
882            Column("a_id", ForeignKey("a.id")),
883            *make_some_columns()
884        )
885
886    @classmethod
887    def setup_classes(cls):
888        class A(cls.Basic):
889            pass
890
891        class B(cls.Basic):
892            pass
893
894        class C(cls.Basic):
895            pass
896
897        class D(cls.Basic):
898            pass
899
900        class E(cls.Basic):
901            pass
902
903        class F(cls.Basic):
904            pass
905
906        class G(cls.Basic):
907            pass
908
909    @classmethod
910    def setup_mappers(cls):
911        A, B, C, D, E, F, G = cls.classes("A", "B", "C", "D", "E", "F", "G")
912        a, b, c, d, e, f, g = cls.tables("a", "b", "c", "d", "e", "f", "g")
913
914        mapper(A, a, properties={"bs": relationship(B), "gs": relationship(G)})
915        mapper(
916            B,
917            b,
918            properties={
919                "cs": relationship(C),
920                "ds": relationship(D),
921                "es": relationship(E),
922                "fs": relationship(F),
923            },
924        )
925        mapper(C, c)
926        mapper(D, d)
927        mapper(E, e)
928        mapper(F, f)
929        mapper(G, g)
930
931        configure_mappers()
932
933    def test_generate_cache_key_unbound_branching(self):
934        A, B, C, D, E, F, G = self.classes("A", "B", "C", "D", "E", "F", "G")
935
936        base = joinedload(A.bs)
937        opts = [
938            base.joinedload(B.cs),
939            base.joinedload(B.ds),
940            base.joinedload(B.es),
941            base.joinedload(B.fs),
942        ]
943
944        cache_path = inspect(A)._path_registry
945
946        @profiling.function_call_count()
947        def go():
948            for opt in opts:
949                opt._generate_cache_key(cache_path)
950
951        go()
952
953    def test_generate_cache_key_bound_branching(self):
954        A, B, C, D, E, F, G = self.classes("A", "B", "C", "D", "E", "F", "G")
955
956        base = Load(A).joinedload(A.bs)
957        opts = [
958            base.joinedload(B.cs),
959            base.joinedload(B.ds),
960            base.joinedload(B.es),
961            base.joinedload(B.fs),
962        ]
963
964        cache_path = inspect(A)._path_registry
965
966        @profiling.function_call_count()
967        def go():
968            for opt in opts:
969                opt._generate_cache_key(cache_path)
970
971        go()
972
973    def test_query_opts_unbound_branching(self):
974        A, B, C, D, E, F, G = self.classes("A", "B", "C", "D", "E", "F", "G")
975
976        base = joinedload(A.bs)
977        opts = [
978            base.joinedload(B.cs),
979            base.joinedload(B.ds),
980            base.joinedload(B.es),
981            base.joinedload(B.fs),
982        ]
983
984        q = Session().query(A)
985
986        @profiling.function_call_count()
987        def go():
988            q.options(*opts)
989
990        go()
991
992    def test_query_opts_key_bound_branching(self):
993        A, B, C, D, E, F, G = self.classes("A", "B", "C", "D", "E", "F", "G")
994
995        base = Load(A).joinedload(A.bs)
996        opts = [
997            base.joinedload(B.cs),
998            base.joinedload(B.ds),
999            base.joinedload(B.es),
1000            base.joinedload(B.fs),
1001        ]
1002
1003        q = Session().query(A)
1004
1005        @profiling.function_call_count()
1006        def go():
1007            q.options(*opts)
1008
1009        go()
1010
1011
1012class AnnotatedOverheadTest(fixtures.MappedTest):
1013    @classmethod
1014    def define_tables(cls, metadata):
1015        Table(
1016            "a",
1017            metadata,
1018            Column("id", Integer, primary_key=True),
1019            Column("data", String(50)),
1020        )
1021
1022    @classmethod
1023    def setup_classes(cls):
1024        class A(cls.Basic):
1025            pass
1026
1027    @classmethod
1028    def setup_mappers(cls):
1029        A = cls.classes.A
1030        a = cls.tables.a
1031
1032        mapper(A, a)
1033
1034    @classmethod
1035    def insert_data(cls):
1036        A = cls.classes.A
1037        s = Session()
1038        s.add_all([A(data="asdf") for i in range(5)])
1039        s.commit()
1040
1041    def test_no_bundle(self):
1042        A = self.classes.A
1043        s = Session()
1044
1045        q = s.query(A).select_from(A)
1046
1047        @profiling.function_call_count()
1048        def go():
1049            for i in range(100):
1050                q.all()
1051
1052        go()
1053
1054    def test_no_entity_wo_annotations(self):
1055        A = self.classes.A
1056        a = self.tables.a
1057        s = Session()
1058
1059        q = s.query(a.c.data).select_from(A)
1060
1061        @profiling.function_call_count()
1062        def go():
1063            for i in range(100):
1064                q.all()
1065
1066        go()
1067
1068    def test_no_entity_w_annotations(self):
1069        A = self.classes.A
1070        s = Session()
1071        q = s.query(A.data).select_from(A)
1072
1073        @profiling.function_call_count()
1074        def go():
1075            for i in range(100):
1076                q.all()
1077
1078        go()
1079
1080    def test_entity_w_annotations(self):
1081        A = self.classes.A
1082        s = Session()
1083        q = s.query(A, A.data).select_from(A)
1084
1085        @profiling.function_call_count()
1086        def go():
1087            for i in range(100):
1088                q.all()
1089
1090        go()
1091
1092    def test_entity_wo_annotations(self):
1093        A = self.classes.A
1094        a = self.tables.a
1095        s = Session()
1096        q = s.query(A, a.c.data).select_from(A)
1097
1098        @profiling.function_call_count()
1099        def go():
1100            for i in range(100):
1101                q.all()
1102
1103        go()
1104
1105    def test_no_bundle_wo_annotations(self):
1106        A = self.classes.A
1107        a = self.tables.a
1108        s = Session()
1109        q = s.query(a.c.data, A).select_from(A)
1110
1111        @profiling.function_call_count()
1112        def go():
1113            for i in range(100):
1114                q.all()
1115
1116        go()
1117
1118    def test_no_bundle_w_annotations(self):
1119        A = self.classes.A
1120        s = Session()
1121        q = s.query(A.data, A).select_from(A)
1122
1123        @profiling.function_call_count()
1124        def go():
1125            for i in range(100):
1126                q.all()
1127
1128        go()
1129
1130    def test_bundle_wo_annotation(self):
1131        A = self.classes.A
1132        a = self.tables.a
1133        s = Session()
1134        q = s.query(Bundle("ASdf", a.c.data), A).select_from(A)
1135
1136        @profiling.function_call_count()
1137        def go():
1138            for i in range(100):
1139                q.all()
1140
1141        go()
1142
1143    def test_bundle_w_annotation(self):
1144        A = self.classes.A
1145        s = Session()
1146        q = s.query(Bundle("ASdf", A.data), A).select_from(A)
1147
1148        @profiling.function_call_count()
1149        def go():
1150            for i in range(100):
1151                q.all()
1152
1153        go()
1154