1import operator
2import re
3
4import sqlalchemy as sa
5from .. import assert_raises_message
6from .. import config
7from .. import engines
8from .. import eq_
9from .. import expect_warnings
10from .. import fixtures
11from .. import is_
12from ..provision import temp_table_keyword_args
13from ..schema import Column
14from ..schema import Table
15from ... import event
16from ... import exc as sa_exc
17from ... import ForeignKey
18from ... import inspect
19from ... import Integer
20from ... import MetaData
21from ... import String
22from ... import testing
23from ... import types as sql_types
24from ...engine.reflection import Inspector
25from ...schema import DDL
26from ...schema import Index
27from ...sql.elements import quoted_name
28from ...testing import is_false
29from ...testing import is_true
30
31
32metadata, users = None, None
33
34
35class HasTableTest(fixtures.TablesTest):
36    __backend__ = True
37
38    @classmethod
39    def define_tables(cls, metadata):
40        Table(
41            "test_table",
42            metadata,
43            Column("id", Integer, primary_key=True),
44            Column("data", String(50)),
45        )
46        if testing.requires.schemas.enabled:
47            Table(
48                "test_table_s",
49                metadata,
50                Column("id", Integer, primary_key=True),
51                Column("data", String(50)),
52                schema=config.test_schema,
53            )
54
55    def test_has_table(self):
56        with config.db.begin() as conn:
57            is_true(config.db.dialect.has_table(conn, "test_table"))
58            is_false(config.db.dialect.has_table(conn, "test_table_s"))
59            is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
60
61    @testing.requires.schemas
62    def test_has_table_schema(self):
63        with config.db.begin() as conn:
64            is_false(
65                config.db.dialect.has_table(
66                    conn, "test_table", schema=config.test_schema
67                )
68            )
69            is_true(
70                config.db.dialect.has_table(
71                    conn, "test_table_s", schema=config.test_schema
72                )
73            )
74            is_false(
75                config.db.dialect.has_table(
76                    conn, "nonexistent_table", schema=config.test_schema
77                )
78            )
79
80
81class QuotedNameArgumentTest(fixtures.TablesTest):
82    run_create_tables = "once"
83    __backend__ = True
84
85    @classmethod
86    def define_tables(cls, metadata):
87        Table(
88            "quote ' one",
89            metadata,
90            Column("id", Integer),
91            Column("name", String(50)),
92            Column("data", String(50)),
93            Column("related_id", Integer),
94            sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
95            sa.Index("ix quote ' one", "name"),
96            sa.UniqueConstraint(
97                "data",
98                name="uq quote' one",
99            ),
100            sa.ForeignKeyConstraint(
101                ["id"], ["related.id"], name="fk quote ' one"
102            ),
103            sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
104            comment=r"""quote ' one comment""",
105            test_needs_fk=True,
106        )
107
108        if testing.requires.symbol_names_w_double_quote.enabled:
109            Table(
110                'quote " two',
111                metadata,
112                Column("id", Integer),
113                Column("name", String(50)),
114                Column("data", String(50)),
115                Column("related_id", Integer),
116                sa.PrimaryKeyConstraint("id", name='pk quote " two'),
117                sa.Index('ix quote " two', "name"),
118                sa.UniqueConstraint(
119                    "data",
120                    name='uq quote" two',
121                ),
122                sa.ForeignKeyConstraint(
123                    ["id"], ["related.id"], name='fk quote " two'
124                ),
125                sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
126                comment=r"""quote " two comment""",
127                test_needs_fk=True,
128            )
129
130        Table(
131            "related",
132            metadata,
133            Column("id", Integer, primary_key=True),
134            Column("related", Integer),
135            test_needs_fk=True,
136        )
137
138        if testing.requires.view_column_reflection.enabled:
139
140            if testing.requires.symbol_names_w_double_quote.enabled:
141                names = [
142                    "quote ' one",
143                    'quote " two',
144                ]
145            else:
146                names = [
147                    "quote ' one",
148                ]
149            for name in names:
150                query = "CREATE VIEW %s AS SELECT * FROM %s" % (
151                    testing.db.dialect.identifier_preparer.quote(
152                        "view %s" % name
153                    ),
154                    testing.db.dialect.identifier_preparer.quote(name),
155                )
156
157                event.listen(metadata, "after_create", DDL(query))
158                event.listen(
159                    metadata,
160                    "before_drop",
161                    DDL(
162                        "DROP VIEW %s"
163                        % testing.db.dialect.identifier_preparer.quote(
164                            "view %s" % name
165                        )
166                    ),
167                )
168
169    def quote_fixtures(fn):
170        return testing.combinations(
171            ("quote ' one",),
172            ('quote " two', testing.requires.symbol_names_w_double_quote),
173        )(fn)
174
175    @quote_fixtures
176    def test_get_table_options(self, name):
177        insp = inspect(testing.db)
178
179        insp.get_table_options(name)
180
181    @quote_fixtures
182    @testing.requires.view_column_reflection
183    def test_get_view_definition(self, name):
184        insp = inspect(testing.db)
185        assert insp.get_view_definition("view %s" % name)
186
187    @quote_fixtures
188    def test_get_columns(self, name):
189        insp = inspect(testing.db)
190        assert insp.get_columns(name)
191
192    @quote_fixtures
193    def test_get_pk_constraint(self, name):
194        insp = inspect(testing.db)
195        assert insp.get_pk_constraint(name)
196
197    @quote_fixtures
198    def test_get_foreign_keys(self, name):
199        insp = inspect(testing.db)
200        assert insp.get_foreign_keys(name)
201
202    @quote_fixtures
203    def test_get_indexes(self, name):
204        insp = inspect(testing.db)
205        assert insp.get_indexes(name)
206
207    @quote_fixtures
208    @testing.requires.unique_constraint_reflection
209    def test_get_unique_constraints(self, name):
210        insp = inspect(testing.db)
211        assert insp.get_unique_constraints(name)
212
213    @quote_fixtures
214    @testing.requires.comment_reflection
215    def test_get_table_comment(self, name):
216        insp = inspect(testing.db)
217        assert insp.get_table_comment(name)
218
219    @quote_fixtures
220    @testing.requires.check_constraint_reflection
221    def test_get_check_constraints(self, name):
222        insp = inspect(testing.db)
223        assert insp.get_check_constraints(name)
224
225
226class ComponentReflectionTest(fixtures.TablesTest):
227    run_inserts = run_deletes = None
228
229    __backend__ = True
230
231    @classmethod
232    def setup_bind(cls):
233        if config.requirements.independent_connections.enabled:
234            from sqlalchemy import pool
235
236            return engines.testing_engine(
237                options=dict(poolclass=pool.StaticPool)
238            )
239        else:
240            return config.db
241
242    @classmethod
243    def define_tables(cls, metadata):
244        cls.define_reflected_tables(metadata, None)
245        if testing.requires.schemas.enabled:
246            cls.define_reflected_tables(metadata, testing.config.test_schema)
247
248    @classmethod
249    def define_reflected_tables(cls, metadata, schema):
250        if schema:
251            schema_prefix = schema + "."
252        else:
253            schema_prefix = ""
254
255        if testing.requires.self_referential_foreign_keys.enabled:
256            users = Table(
257                "users",
258                metadata,
259                Column("user_id", sa.INT, primary_key=True),
260                Column("test1", sa.CHAR(5), nullable=False),
261                Column("test2", sa.Float(5), nullable=False),
262                Column(
263                    "parent_user_id",
264                    sa.Integer,
265                    sa.ForeignKey(
266                        "%susers.user_id" % schema_prefix, name="user_id_fk"
267                    ),
268                ),
269                schema=schema,
270                test_needs_fk=True,
271            )
272        else:
273            users = Table(
274                "users",
275                metadata,
276                Column("user_id", sa.INT, primary_key=True),
277                Column("test1", sa.CHAR(5), nullable=False),
278                Column("test2", sa.Float(5), nullable=False),
279                schema=schema,
280                test_needs_fk=True,
281            )
282
283        Table(
284            "dingalings",
285            metadata,
286            Column("dingaling_id", sa.Integer, primary_key=True),
287            Column(
288                "address_id",
289                sa.Integer,
290                sa.ForeignKey("%semail_addresses.address_id" % schema_prefix),
291            ),
292            Column("data", sa.String(30)),
293            schema=schema,
294            test_needs_fk=True,
295        )
296        Table(
297            "email_addresses",
298            metadata,
299            Column("address_id", sa.Integer),
300            Column(
301                "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id)
302            ),
303            Column("email_address", sa.String(20)),
304            sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"),
305            schema=schema,
306            test_needs_fk=True,
307        )
308        Table(
309            "comment_test",
310            metadata,
311            Column("id", sa.Integer, primary_key=True, comment="id comment"),
312            Column("data", sa.String(20), comment="data % comment"),
313            Column(
314                "d2",
315                sa.String(20),
316                comment=r"""Comment types type speedily ' " \ '' Fun!""",
317            ),
318            schema=schema,
319            comment=r"""the test % ' " \ table comment""",
320        )
321
322        if testing.requires.cross_schema_fk_reflection.enabled:
323            if schema is None:
324                Table(
325                    "local_table",
326                    metadata,
327                    Column("id", sa.Integer, primary_key=True),
328                    Column("data", sa.String(20)),
329                    Column(
330                        "remote_id",
331                        ForeignKey(
332                            "%s.remote_table_2.id" % testing.config.test_schema
333                        ),
334                    ),
335                    test_needs_fk=True,
336                    schema=config.db.dialect.default_schema_name,
337                )
338            else:
339                Table(
340                    "remote_table",
341                    metadata,
342                    Column("id", sa.Integer, primary_key=True),
343                    Column(
344                        "local_id",
345                        ForeignKey(
346                            "%s.local_table.id"
347                            % config.db.dialect.default_schema_name
348                        ),
349                    ),
350                    Column("data", sa.String(20)),
351                    schema=schema,
352                    test_needs_fk=True,
353                )
354                Table(
355                    "remote_table_2",
356                    metadata,
357                    Column("id", sa.Integer, primary_key=True),
358                    Column("data", sa.String(20)),
359                    schema=schema,
360                    test_needs_fk=True,
361                )
362
363        if testing.requires.index_reflection.enabled:
364            cls.define_index(metadata, users)
365
366            if not schema:
367                # test_needs_fk is at the moment to force MySQL InnoDB
368                noncol_idx_test_nopk = Table(
369                    "noncol_idx_test_nopk",
370                    metadata,
371                    Column("q", sa.String(5)),
372                    test_needs_fk=True,
373                )
374
375                noncol_idx_test_pk = Table(
376                    "noncol_idx_test_pk",
377                    metadata,
378                    Column("id", sa.Integer, primary_key=True),
379                    Column("q", sa.String(5)),
380                    test_needs_fk=True,
381                )
382
383                if testing.requires.indexes_with_ascdesc.enabled:
384                    Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc())
385                    Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc())
386
387        if testing.requires.view_column_reflection.enabled:
388            cls.define_views(metadata, schema)
389        if not schema and testing.requires.temp_table_reflection.enabled:
390            cls.define_temp_tables(metadata)
391
392    @classmethod
393    def define_temp_tables(cls, metadata):
394        kw = temp_table_keyword_args(config, config.db)
395        user_tmp = Table(
396            "user_tmp",
397            metadata,
398            Column("id", sa.INT, primary_key=True),
399            Column("name", sa.VARCHAR(50)),
400            Column("foo", sa.INT),
401            sa.UniqueConstraint("name", name="user_tmp_uq"),
402            sa.Index("user_tmp_ix", "foo"),
403            **kw
404        )
405        if (
406            testing.requires.view_reflection.enabled
407            and testing.requires.temporary_views.enabled
408        ):
409            event.listen(
410                user_tmp,
411                "after_create",
412                DDL(
413                    "create temporary view user_tmp_v as "
414                    "select * from user_tmp"
415                ),
416            )
417            event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
418
419    @classmethod
420    def define_index(cls, metadata, users):
421        Index("users_t_idx", users.c.test1, users.c.test2)
422        Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
423
424    @classmethod
425    def define_views(cls, metadata, schema):
426        for table_name in ("users", "email_addresses"):
427            fullname = table_name
428            if schema:
429                fullname = "%s.%s" % (schema, table_name)
430            view_name = fullname + "_v"
431            query = "CREATE VIEW %s AS SELECT * FROM %s" % (
432                view_name,
433                fullname,
434            )
435
436            event.listen(metadata, "after_create", DDL(query))
437            event.listen(
438                metadata, "before_drop", DDL("DROP VIEW %s" % view_name)
439            )
440
441    @testing.requires.schema_reflection
442    def test_get_schema_names(self):
443        insp = inspect(testing.db)
444
445        self.assert_(testing.config.test_schema in insp.get_schema_names())
446
447    @testing.requires.schema_reflection
448    def test_dialect_initialize(self):
449        engine = engines.testing_engine()
450        assert not hasattr(engine.dialect, "default_schema_name")
451        inspect(engine)
452        assert hasattr(engine.dialect, "default_schema_name")
453
454    @testing.requires.schema_reflection
455    def test_get_default_schema_name(self):
456        insp = inspect(testing.db)
457        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
458
459    @testing.provide_metadata
460    def _test_get_table_names(
461        self, schema=None, table_type="table", order_by=None
462    ):
463        _ignore_tables = [
464            "comment_test",
465            "noncol_idx_test_pk",
466            "noncol_idx_test_nopk",
467            "local_table",
468            "remote_table",
469            "remote_table_2",
470        ]
471        meta = self.metadata
472
473        insp = inspect(meta.bind)
474
475        if table_type == "view":
476            table_names = insp.get_view_names(schema)
477            table_names.sort()
478            answer = ["email_addresses_v", "users_v"]
479            eq_(sorted(table_names), answer)
480        else:
481            if order_by:
482                tables = [
483                    rec[0]
484                    for rec in insp.get_sorted_table_and_fkc_names(schema)
485                    if rec[0]
486                ]
487            else:
488                tables = insp.get_table_names(schema)
489            table_names = [t for t in tables if t not in _ignore_tables]
490
491            if order_by == "foreign_key":
492                answer = ["users", "email_addresses", "dingalings"]
493                eq_(table_names, answer)
494            else:
495                answer = ["dingalings", "email_addresses", "users"]
496                eq_(sorted(table_names), answer)
497
498    @testing.requires.temp_table_names
499    def test_get_temp_table_names(self):
500        insp = inspect(self.bind)
501        temp_table_names = insp.get_temp_table_names()
502        eq_(sorted(temp_table_names), ["user_tmp"])
503
504    @testing.requires.view_reflection
505    @testing.requires.temp_table_names
506    @testing.requires.temporary_views
507    def test_get_temp_view_names(self):
508        insp = inspect(self.bind)
509        temp_table_names = insp.get_temp_view_names()
510        eq_(sorted(temp_table_names), ["user_tmp_v"])
511
512    @testing.requires.table_reflection
513    def test_get_table_names(self):
514        self._test_get_table_names()
515
516    @testing.requires.table_reflection
517    @testing.requires.foreign_key_constraint_reflection
518    def test_get_table_names_fks(self):
519        self._test_get_table_names(order_by="foreign_key")
520
521    @testing.requires.comment_reflection
522    def test_get_comments(self):
523        self._test_get_comments()
524
525    @testing.requires.comment_reflection
526    @testing.requires.schemas
527    def test_get_comments_with_schema(self):
528        self._test_get_comments(testing.config.test_schema)
529
530    def _test_get_comments(self, schema=None):
531        insp = inspect(testing.db)
532
533        eq_(
534            insp.get_table_comment("comment_test", schema=schema),
535            {"text": r"""the test % ' " \ table comment"""},
536        )
537
538        eq_(insp.get_table_comment("users", schema=schema), {"text": None})
539
540        eq_(
541            [
542                {"name": rec["name"], "comment": rec["comment"]}
543                for rec in insp.get_columns("comment_test", schema=schema)
544            ],
545            [
546                {"comment": "id comment", "name": "id"},
547                {"comment": "data % comment", "name": "data"},
548                {
549                    "comment": (
550                        r"""Comment types type speedily ' " \ '' Fun!"""
551                    ),
552                    "name": "d2",
553                },
554            ],
555        )
556
557    @testing.requires.table_reflection
558    @testing.requires.schemas
559    def test_get_table_names_with_schema(self):
560        self._test_get_table_names(testing.config.test_schema)
561
562    @testing.requires.view_column_reflection
563    def test_get_view_names(self):
564        self._test_get_table_names(table_type="view")
565
566    @testing.requires.view_column_reflection
567    @testing.requires.schemas
568    def test_get_view_names_with_schema(self):
569        self._test_get_table_names(
570            testing.config.test_schema, table_type="view"
571        )
572
573    @testing.requires.table_reflection
574    @testing.requires.view_column_reflection
575    def test_get_tables_and_views(self):
576        self._test_get_table_names()
577        self._test_get_table_names(table_type="view")
578
579    def _test_get_columns(self, schema=None, table_type="table"):
580        meta = MetaData(testing.db)
581        users, addresses = (self.tables.users, self.tables.email_addresses)
582        table_names = ["users", "email_addresses"]
583        if table_type == "view":
584            table_names = ["users_v", "email_addresses_v"]
585        insp = inspect(meta.bind)
586        for table_name, table in zip(table_names, (users, addresses)):
587            schema_name = schema
588            cols = insp.get_columns(table_name, schema=schema_name)
589            self.assert_(len(cols) > 0, len(cols))
590
591            # should be in order
592
593            for i, col in enumerate(table.columns):
594                eq_(col.name, cols[i]["name"])
595                ctype = cols[i]["type"].__class__
596                ctype_def = col.type
597                if isinstance(ctype_def, sa.types.TypeEngine):
598                    ctype_def = ctype_def.__class__
599
600                # Oracle returns Date for DateTime.
601
602                if testing.against("oracle") and ctype_def in (
603                    sql_types.Date,
604                    sql_types.DateTime,
605                ):
606                    ctype_def = sql_types.Date
607
608                # assert that the desired type and return type share
609                # a base within one of the generic types.
610
611                self.assert_(
612                    len(
613                        set(ctype.__mro__)
614                        .intersection(ctype_def.__mro__)
615                        .intersection(
616                            [
617                                sql_types.Integer,
618                                sql_types.Numeric,
619                                sql_types.DateTime,
620                                sql_types.Date,
621                                sql_types.Time,
622                                sql_types.String,
623                                sql_types._Binary,
624                            ]
625                        )
626                    )
627                    > 0,
628                    "%s(%s), %s(%s)"
629                    % (col.name, col.type, cols[i]["name"], ctype),
630                )
631
632                if not col.primary_key:
633                    assert cols[i]["default"] is None
634
635    @testing.requires.table_reflection
636    def test_get_columns(self):
637        self._test_get_columns()
638
639    @testing.provide_metadata
640    def _type_round_trip(self, *types):
641        t = Table(
642            "t",
643            self.metadata,
644            *[Column("t%d" % i, type_) for i, type_ in enumerate(types)]
645        )
646        t.create()
647
648        return [
649            c["type"] for c in inspect(self.metadata.bind).get_columns("t")
650        ]
651
652    @testing.requires.table_reflection
653    def test_numeric_reflection(self):
654        for typ in self._type_round_trip(sql_types.Numeric(18, 5)):
655            assert isinstance(typ, sql_types.Numeric)
656            eq_(typ.precision, 18)
657            eq_(typ.scale, 5)
658
659    @testing.requires.table_reflection
660    def test_varchar_reflection(self):
661        typ = self._type_round_trip(sql_types.String(52))[0]
662        assert isinstance(typ, sql_types.String)
663        eq_(typ.length, 52)
664
665    @testing.requires.table_reflection
666    @testing.provide_metadata
667    def test_nullable_reflection(self):
668        t = Table(
669            "t",
670            self.metadata,
671            Column("a", Integer, nullable=True),
672            Column("b", Integer, nullable=False),
673        )
674        t.create()
675        eq_(
676            dict(
677                (col["name"], col["nullable"])
678                for col in inspect(self.metadata.bind).get_columns("t")
679            ),
680            {"a": True, "b": False},
681        )
682
683    @testing.requires.table_reflection
684    @testing.requires.schemas
685    def test_get_columns_with_schema(self):
686        self._test_get_columns(schema=testing.config.test_schema)
687
688    @testing.requires.temp_table_reflection
689    def test_get_temp_table_columns(self):
690        meta = MetaData(self.bind)
691        user_tmp = self.tables.user_tmp
692        insp = inspect(meta.bind)
693        cols = insp.get_columns("user_tmp")
694        self.assert_(len(cols) > 0, len(cols))
695
696        for i, col in enumerate(user_tmp.columns):
697            eq_(col.name, cols[i]["name"])
698
699    @testing.requires.temp_table_reflection
700    @testing.requires.view_column_reflection
701    @testing.requires.temporary_views
702    def test_get_temp_view_columns(self):
703        insp = inspect(self.bind)
704        cols = insp.get_columns("user_tmp_v")
705        eq_([col["name"] for col in cols], ["id", "name", "foo"])
706
707    @testing.requires.view_column_reflection
708    def test_get_view_columns(self):
709        self._test_get_columns(table_type="view")
710
711    @testing.requires.view_column_reflection
712    @testing.requires.schemas
713    def test_get_view_columns_with_schema(self):
714        self._test_get_columns(
715            schema=testing.config.test_schema, table_type="view"
716        )
717
718    @testing.provide_metadata
719    def _test_get_pk_constraint(self, schema=None):
720        meta = self.metadata
721        users, addresses = self.tables.users, self.tables.email_addresses
722        insp = inspect(meta.bind)
723
724        users_cons = insp.get_pk_constraint(users.name, schema=schema)
725        users_pkeys = users_cons["constrained_columns"]
726        eq_(users_pkeys, ["user_id"])
727
728        addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
729        addr_pkeys = addr_cons["constrained_columns"]
730        eq_(addr_pkeys, ["address_id"])
731
732        with testing.requires.reflects_pk_names.fail_if():
733            eq_(addr_cons["name"], "email_ad_pk")
734
735    @testing.requires.primary_key_constraint_reflection
736    def test_get_pk_constraint(self):
737        self._test_get_pk_constraint()
738
739    @testing.requires.table_reflection
740    @testing.requires.primary_key_constraint_reflection
741    @testing.requires.schemas
742    def test_get_pk_constraint_with_schema(self):
743        self._test_get_pk_constraint(schema=testing.config.test_schema)
744
745    @testing.requires.table_reflection
746    @testing.provide_metadata
747    def test_deprecated_get_primary_keys(self):
748        meta = self.metadata
749        users = self.tables.users
750        insp = Inspector(meta.bind)
751        assert_raises_message(
752            sa_exc.SADeprecationWarning,
753            r".*get_primary_keys\(\) method is deprecated",
754            insp.get_primary_keys,
755            users.name,
756        )
757
758    @testing.provide_metadata
759    def _test_get_foreign_keys(self, schema=None):
760        meta = self.metadata
761        users, addresses = (self.tables.users, self.tables.email_addresses)
762        insp = inspect(meta.bind)
763        expected_schema = schema
764        # users
765
766        if testing.requires.self_referential_foreign_keys.enabled:
767            users_fkeys = insp.get_foreign_keys(users.name, schema=schema)
768            fkey1 = users_fkeys[0]
769
770            with testing.requires.named_constraints.fail_if():
771                eq_(fkey1["name"], "user_id_fk")
772
773            eq_(fkey1["referred_schema"], expected_schema)
774            eq_(fkey1["referred_table"], users.name)
775            eq_(fkey1["referred_columns"], ["user_id"])
776            if testing.requires.self_referential_foreign_keys.enabled:
777                eq_(fkey1["constrained_columns"], ["parent_user_id"])
778
779        # addresses
780        addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema)
781        fkey1 = addr_fkeys[0]
782
783        with testing.requires.implicitly_named_constraints.fail_if():
784            self.assert_(fkey1["name"] is not None)
785
786        eq_(fkey1["referred_schema"], expected_schema)
787        eq_(fkey1["referred_table"], users.name)
788        eq_(fkey1["referred_columns"], ["user_id"])
789        eq_(fkey1["constrained_columns"], ["remote_user_id"])
790
791    @testing.requires.foreign_key_constraint_reflection
792    def test_get_foreign_keys(self):
793        self._test_get_foreign_keys()
794
795    @testing.requires.foreign_key_constraint_reflection
796    @testing.requires.schemas
797    def test_get_foreign_keys_with_schema(self):
798        self._test_get_foreign_keys(schema=testing.config.test_schema)
799
800    @testing.requires.cross_schema_fk_reflection
801    @testing.requires.schemas
802    def test_get_inter_schema_foreign_keys(self):
803        local_table, remote_table, remote_table_2 = self.tables(
804            "%s.local_table" % testing.db.dialect.default_schema_name,
805            "%s.remote_table" % testing.config.test_schema,
806            "%s.remote_table_2" % testing.config.test_schema,
807        )
808
809        insp = inspect(config.db)
810
811        local_fkeys = insp.get_foreign_keys(local_table.name)
812        eq_(len(local_fkeys), 1)
813
814        fkey1 = local_fkeys[0]
815        eq_(fkey1["referred_schema"], testing.config.test_schema)
816        eq_(fkey1["referred_table"], remote_table_2.name)
817        eq_(fkey1["referred_columns"], ["id"])
818        eq_(fkey1["constrained_columns"], ["remote_id"])
819
820        remote_fkeys = insp.get_foreign_keys(
821            remote_table.name, schema=testing.config.test_schema
822        )
823        eq_(len(remote_fkeys), 1)
824
825        fkey2 = remote_fkeys[0]
826
827        assert fkey2["referred_schema"] in (
828            None,
829            testing.db.dialect.default_schema_name,
830        )
831        eq_(fkey2["referred_table"], local_table.name)
832        eq_(fkey2["referred_columns"], ["id"])
833        eq_(fkey2["constrained_columns"], ["local_id"])
834
835    @testing.requires.foreign_key_constraint_option_reflection_ondelete
836    def test_get_foreign_key_options_ondelete(self):
837        self._test_get_foreign_key_options(ondelete="CASCADE")
838
839    @testing.requires.foreign_key_constraint_option_reflection_onupdate
840    def test_get_foreign_key_options_onupdate(self):
841        self._test_get_foreign_key_options(onupdate="SET NULL")
842
843    @testing.provide_metadata
844    def _test_get_foreign_key_options(self, **options):
845        meta = self.metadata
846
847        Table(
848            "x",
849            meta,
850            Column("id", Integer, primary_key=True),
851            test_needs_fk=True,
852        )
853
854        Table(
855            "table",
856            meta,
857            Column("id", Integer, primary_key=True),
858            Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")),
859            Column("test", String(10)),
860            test_needs_fk=True,
861        )
862
863        Table(
864            "user",
865            meta,
866            Column("id", Integer, primary_key=True),
867            Column("name", String(50), nullable=False),
868            Column("tid", Integer),
869            sa.ForeignKeyConstraint(
870                ["tid"], ["table.id"], name="myfk", **options
871            ),
872            test_needs_fk=True,
873        )
874
875        meta.create_all()
876
877        insp = inspect(meta.bind)
878
879        # test 'options' is always present for a backend
880        # that can reflect these, since alembic looks for this
881        opts = insp.get_foreign_keys("table")[0]["options"]
882
883        eq_(dict((k, opts[k]) for k in opts if opts[k]), {})
884
885        opts = insp.get_foreign_keys("user")[0]["options"]
886        eq_(dict((k, opts[k]) for k in opts if opts[k]), options)
887
888    def _assert_insp_indexes(self, indexes, expected_indexes):
889        index_names = [d["name"] for d in indexes]
890        for e_index in expected_indexes:
891            assert e_index["name"] in index_names
892            index = indexes[index_names.index(e_index["name"])]
893            for key in e_index:
894                eq_(e_index[key], index[key])
895
896    @testing.provide_metadata
897    def _test_get_indexes(self, schema=None):
898        meta = self.metadata
899
900        # The database may decide to create indexes for foreign keys, etc.
901        # so there may be more indexes than expected.
902        insp = inspect(meta.bind)
903        indexes = insp.get_indexes("users", schema=schema)
904        expected_indexes = [
905            {
906                "unique": False,
907                "column_names": ["test1", "test2"],
908                "name": "users_t_idx",
909            },
910            {
911                "unique": False,
912                "column_names": ["user_id", "test2", "test1"],
913                "name": "users_all_idx",
914            },
915        ]
916        self._assert_insp_indexes(indexes, expected_indexes)
917
918    @testing.requires.index_reflection
919    def test_get_indexes(self):
920        self._test_get_indexes()
921
922    @testing.requires.index_reflection
923    @testing.requires.schemas
924    def test_get_indexes_with_schema(self):
925        self._test_get_indexes(schema=testing.config.test_schema)
926
927    @testing.provide_metadata
928    def _test_get_noncol_index(self, tname, ixname):
929        meta = self.metadata
930        insp = inspect(meta.bind)
931        indexes = insp.get_indexes(tname)
932
933        # reflecting an index that has "x DESC" in it as the column.
934        # the DB may or may not give us "x", but make sure we get the index
935        # back, it has a name, it's connected to the table.
936        expected_indexes = [{"unique": False, "name": ixname}]
937        self._assert_insp_indexes(indexes, expected_indexes)
938
939        t = Table(tname, meta, autoload_with=meta.bind)
940        eq_(len(t.indexes), 1)
941        is_(list(t.indexes)[0].table, t)
942        eq_(list(t.indexes)[0].name, ixname)
943
944    @testing.requires.index_reflection
945    @testing.requires.indexes_with_ascdesc
946    def test_get_noncol_index_no_pk(self):
947        self._test_get_noncol_index("noncol_idx_test_nopk", "noncol_idx_nopk")
948
949    @testing.requires.index_reflection
950    @testing.requires.indexes_with_ascdesc
951    def test_get_noncol_index_pk(self):
952        self._test_get_noncol_index("noncol_idx_test_pk", "noncol_idx_pk")
953
954    @testing.requires.indexes_with_expressions
955    @testing.provide_metadata
956    def test_reflect_expression_based_indexes(self):
957        Table(
958            "t",
959            self.metadata,
960            Column("x", String(30)),
961            Column("y", String(30)),
962        )
963        event.listen(
964            self.metadata,
965            "after_create",
966            DDL("CREATE INDEX t_idx ON t(lower(x), lower(y))"),
967        )
968        event.listen(
969            self.metadata, "after_create", DDL("CREATE INDEX t_idx_2 ON t(x)")
970        )
971        self.metadata.create_all()
972
973        insp = inspect(self.metadata.bind)
974
975        with expect_warnings(
976            "Skipped unsupported reflection of expression-based index t_idx"
977        ):
978            eq_(
979                insp.get_indexes("t"),
980                [{"name": "t_idx_2", "column_names": ["x"], "unique": 0}],
981            )
982
983    @testing.requires.unique_constraint_reflection
984    def test_get_unique_constraints(self):
985        self._test_get_unique_constraints()
986
987    @testing.requires.temp_table_reflection
988    @testing.requires.unique_constraint_reflection
989    def test_get_temp_table_unique_constraints(self):
990        insp = inspect(self.bind)
991        reflected = insp.get_unique_constraints("user_tmp")
992        for refl in reflected:
993            # Different dialects handle duplicate index and constraints
994            # differently, so ignore this flag
995            refl.pop("duplicates_index", None)
996        eq_(reflected, [{"column_names": ["name"], "name": "user_tmp_uq"}])
997
998    @testing.requires.temp_table_reflection
999    def test_get_temp_table_indexes(self):
1000        insp = inspect(self.bind)
1001        indexes = insp.get_indexes("user_tmp")
1002        for ind in indexes:
1003            ind.pop("dialect_options", None)
1004        eq_(
1005            # TODO: we need to add better filtering for indexes/uq constraints
1006            # that are doubled up
1007            [idx for idx in indexes if idx["name"] == "user_tmp_ix"],
1008            [
1009                {
1010                    "unique": False,
1011                    "column_names": ["foo"],
1012                    "name": "user_tmp_ix",
1013                }
1014            ],
1015        )
1016
1017    @testing.requires.unique_constraint_reflection
1018    @testing.requires.schemas
1019    def test_get_unique_constraints_with_schema(self):
1020        self._test_get_unique_constraints(schema=testing.config.test_schema)
1021
1022    @testing.provide_metadata
1023    def _test_get_unique_constraints(self, schema=None):
1024        # SQLite dialect needs to parse the names of the constraints
1025        # separately from what it gets from PRAGMA index_list(), and
1026        # then matches them up.  so same set of column_names in two
1027        # constraints will confuse it.    Perhaps we should no longer
1028        # bother with index_list() here since we have the whole
1029        # CREATE TABLE?
1030        uniques = sorted(
1031            [
1032                {"name": "unique_a", "column_names": ["a"]},
1033                {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]},
1034                {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]},
1035                {"name": "unique_asc_key", "column_names": ["asc", "key"]},
1036                {"name": "i.have.dots", "column_names": ["b"]},
1037                {"name": "i have spaces", "column_names": ["c"]},
1038            ],
1039            key=operator.itemgetter("name"),
1040        )
1041        orig_meta = self.metadata
1042        table = Table(
1043            "testtbl",
1044            orig_meta,
1045            Column("a", sa.String(20)),
1046            Column("b", sa.String(30)),
1047            Column("c", sa.Integer),
1048            # reserved identifiers
1049            Column("asc", sa.String(30)),
1050            Column("key", sa.String(30)),
1051            schema=schema,
1052        )
1053        for uc in uniques:
1054            table.append_constraint(
1055                sa.UniqueConstraint(*uc["column_names"], name=uc["name"])
1056            )
1057        orig_meta.create_all()
1058
1059        inspector = inspect(orig_meta.bind)
1060        reflected = sorted(
1061            inspector.get_unique_constraints("testtbl", schema=schema),
1062            key=operator.itemgetter("name"),
1063        )
1064
1065        names_that_duplicate_index = set()
1066
1067        for orig, refl in zip(uniques, reflected):
1068            # Different dialects handle duplicate index and constraints
1069            # differently, so ignore this flag
1070            dupe = refl.pop("duplicates_index", None)
1071            if dupe:
1072                names_that_duplicate_index.add(dupe)
1073            eq_(orig, refl)
1074
1075        reflected_metadata = MetaData()
1076        reflected = Table(
1077            "testtbl",
1078            reflected_metadata,
1079            autoload_with=orig_meta.bind,
1080            schema=schema,
1081        )
1082
1083        # test "deduplicates for index" logic.   MySQL and Oracle
1084        # "unique constraints" are actually unique indexes (with possible
1085        # exception of a unique that is a dupe of another one in the case
1086        # of Oracle).  make sure # they aren't duplicated.
1087        idx_names = set([idx.name for idx in reflected.indexes])
1088        uq_names = set(
1089            [
1090                uq.name
1091                for uq in reflected.constraints
1092                if isinstance(uq, sa.UniqueConstraint)
1093            ]
1094        ).difference(["unique_c_a_b"])
1095
1096        assert not idx_names.intersection(uq_names)
1097        if names_that_duplicate_index:
1098            eq_(names_that_duplicate_index, idx_names)
1099            eq_(uq_names, set())
1100
1101    @testing.requires.check_constraint_reflection
1102    def test_get_check_constraints(self):
1103        self._test_get_check_constraints()
1104
1105    @testing.requires.check_constraint_reflection
1106    @testing.requires.schemas
1107    def test_get_check_constraints_schema(self):
1108        self._test_get_check_constraints(schema=testing.config.test_schema)
1109
1110    @testing.provide_metadata
1111    def _test_get_check_constraints(self, schema=None):
1112        orig_meta = self.metadata
1113        Table(
1114            "sa_cc",
1115            orig_meta,
1116            Column("a", Integer()),
1117            sa.CheckConstraint("a > 1 AND a < 5", name="cc1"),
1118            sa.CheckConstraint("a = 1 OR (a > 2 AND a < 5)", name="cc2"),
1119            schema=schema,
1120        )
1121
1122        orig_meta.create_all()
1123
1124        inspector = inspect(orig_meta.bind)
1125        reflected = sorted(
1126            inspector.get_check_constraints("sa_cc", schema=schema),
1127            key=operator.itemgetter("name"),
1128        )
1129
1130        # trying to minimize effect of quoting, parenthesis, etc.
1131        # may need to add more to this as new dialects get CHECK
1132        # constraint reflection support
1133        def normalize(sqltext):
1134            return " ".join(
1135                re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I)
1136            )
1137
1138        reflected = [
1139            {"name": item["name"], "sqltext": normalize(item["sqltext"])}
1140            for item in reflected
1141        ]
1142        eq_(
1143            reflected,
1144            [
1145                {"name": "cc1", "sqltext": "a > 1 and a < 5"},
1146                {"name": "cc2", "sqltext": "a = 1 or a > 2 and a < 5"},
1147            ],
1148        )
1149
1150    @testing.provide_metadata
1151    def _test_get_view_definition(self, schema=None):
1152        meta = self.metadata
1153        view_name1 = "users_v"
1154        view_name2 = "email_addresses_v"
1155        insp = inspect(meta.bind)
1156        v1 = insp.get_view_definition(view_name1, schema=schema)
1157        self.assert_(v1)
1158        v2 = insp.get_view_definition(view_name2, schema=schema)
1159        self.assert_(v2)
1160
1161    @testing.requires.view_reflection
1162    def test_get_view_definition(self):
1163        self._test_get_view_definition()
1164
1165    @testing.requires.view_reflection
1166    @testing.requires.schemas
1167    def test_get_view_definition_with_schema(self):
1168        self._test_get_view_definition(schema=testing.config.test_schema)
1169
1170    @testing.only_on("postgresql", "PG specific feature")
1171    @testing.provide_metadata
1172    def _test_get_table_oid(self, table_name, schema=None):
1173        meta = self.metadata
1174        insp = inspect(meta.bind)
1175        oid = insp.get_table_oid(table_name, schema)
1176        self.assert_(isinstance(oid, int))
1177
1178    def test_get_table_oid(self):
1179        self._test_get_table_oid("users")
1180
1181    @testing.requires.schemas
1182    def test_get_table_oid_with_schema(self):
1183        self._test_get_table_oid("users", schema=testing.config.test_schema)
1184
1185    @testing.requires.table_reflection
1186    @testing.provide_metadata
1187    def test_autoincrement_col(self):
1188        """test that 'autoincrement' is reflected according to sqla's policy.
1189
1190        Don't mark this test as unsupported for any backend !
1191
1192        (technically it fails with MySQL InnoDB since "id" comes before "id2")
1193
1194        A backend is better off not returning "autoincrement" at all,
1195        instead of potentially returning "False" for an auto-incrementing
1196        primary key column.
1197
1198        """
1199
1200        meta = self.metadata
1201        insp = inspect(meta.bind)
1202
1203        for tname, cname in [
1204            ("users", "user_id"),
1205            ("email_addresses", "address_id"),
1206            ("dingalings", "dingaling_id"),
1207        ]:
1208            cols = insp.get_columns(tname)
1209            id_ = {c["name"]: c for c in cols}[cname]
1210            assert id_.get("autoincrement", True)
1211
1212
1213class NormalizedNameTest(fixtures.TablesTest):
1214    __requires__ = ("denormalized_names",)
1215    __backend__ = True
1216
1217    @classmethod
1218    def define_tables(cls, metadata):
1219        Table(
1220            quoted_name("t1", quote=True),
1221            metadata,
1222            Column("id", Integer, primary_key=True),
1223        )
1224        Table(
1225            quoted_name("t2", quote=True),
1226            metadata,
1227            Column("id", Integer, primary_key=True),
1228            Column("t1id", ForeignKey("t1.id")),
1229        )
1230
1231    def test_reflect_lowercase_forced_tables(self):
1232
1233        m2 = MetaData(testing.db)
1234        t2_ref = Table(quoted_name("t2", quote=True), m2, autoload=True)
1235        t1_ref = m2.tables["t1"]
1236        assert t2_ref.c.t1id.references(t1_ref.c.id)
1237
1238        m3 = MetaData(testing.db)
1239        m3.reflect(only=lambda name, m: name.lower() in ("t1", "t2"))
1240        assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id)
1241
1242    def test_get_table_names(self):
1243        tablenames = [
1244            t
1245            for t in inspect(testing.db).get_table_names()
1246            if t.lower() in ("t1", "t2")
1247        ]
1248
1249        eq_(tablenames[0].upper(), tablenames[0].lower())
1250        eq_(tablenames[1].upper(), tablenames[1].lower())
1251
1252
1253class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
1254    def test_computed_col_default_not_set(self):
1255        insp = inspect(config.db)
1256
1257        cols = insp.get_columns("computed_column_table")
1258        for col in cols:
1259            if col["name"] == "with_default":
1260                is_true("42" in col["default"])
1261            elif not col["autoincrement"]:
1262                is_(col["default"], None)
1263
1264    def test_get_column_returns_computed(self):
1265        insp = inspect(config.db)
1266
1267        cols = insp.get_columns("computed_default_table")
1268        data = {c["name"]: c for c in cols}
1269        for key in ("id", "normal", "with_default"):
1270            is_true("computed" not in data[key])
1271        compData = data["computed_col"]
1272        is_true("computed" in compData)
1273        is_true("sqltext" in compData["computed"])
1274        eq_(self.normalize(compData["computed"]["sqltext"]), "normal+42")
1275        eq_(
1276            "persisted" in compData["computed"],
1277            testing.requires.computed_columns_reflect_persisted.enabled,
1278        )
1279        if testing.requires.computed_columns_reflect_persisted.enabled:
1280            eq_(
1281                compData["computed"]["persisted"],
1282                testing.requires.computed_columns_default_persisted.enabled,
1283            )
1284
1285    def check_column(self, data, column, sqltext, persisted):
1286        is_true("computed" in data[column])
1287        compData = data[column]["computed"]
1288        eq_(self.normalize(compData["sqltext"]), sqltext)
1289        if testing.requires.computed_columns_reflect_persisted.enabled:
1290            is_true("persisted" in compData)
1291            is_(compData["persisted"], persisted)
1292
1293    def test_get_column_returns_persisted(self):
1294        insp = inspect(config.db)
1295
1296        cols = insp.get_columns("computed_column_table")
1297        data = {c["name"]: c for c in cols}
1298
1299        self.check_column(
1300            data,
1301            "computed_no_flag",
1302            "normal+42",
1303            testing.requires.computed_columns_default_persisted.enabled,
1304        )
1305        if testing.requires.computed_columns_virtual.enabled:
1306            self.check_column(
1307                data,
1308                "computed_virtual",
1309                "normal+2",
1310                False,
1311            )
1312        if testing.requires.computed_columns_stored.enabled:
1313            self.check_column(
1314                data,
1315                "computed_stored",
1316                "normal-42",
1317                True,
1318            )
1319
1320    @testing.requires.schemas
1321    def test_get_column_returns_persisted_with_schema(self):
1322        insp = inspect(config.db)
1323
1324        cols = insp.get_columns(
1325            "computed_column_table", schema=config.test_schema
1326        )
1327        data = {c["name"]: c for c in cols}
1328
1329        self.check_column(
1330            data,
1331            "computed_no_flag",
1332            "normal/42",
1333            testing.requires.computed_columns_default_persisted.enabled,
1334        )
1335        if testing.requires.computed_columns_virtual.enabled:
1336            self.check_column(
1337                data,
1338                "computed_virtual",
1339                "normal/2",
1340                False,
1341            )
1342        if testing.requires.computed_columns_stored.enabled:
1343            self.check_column(
1344                data,
1345                "computed_stored",
1346                "normal*42",
1347                True,
1348            )
1349
1350
1351class CompositeKeyReflectionTest(fixtures.TablesTest):
1352    __backend__ = True
1353
1354    @classmethod
1355    def define_tables(cls, metadata):
1356        tb1 = Table(
1357            "tb1",
1358            metadata,
1359            Column("id", Integer),
1360            Column("attr", Integer),
1361            Column("name", sql_types.VARCHAR(20)),
1362            sa.PrimaryKeyConstraint("name", "id", "attr", name="pk_tb1"),
1363            schema=None,
1364            test_needs_fk=True,
1365        )
1366        Table(
1367            "tb2",
1368            metadata,
1369            Column("id", Integer, primary_key=True),
1370            Column("pid", Integer),
1371            Column("pattr", Integer),
1372            Column("pname", sql_types.VARCHAR(20)),
1373            sa.ForeignKeyConstraint(
1374                ["pname", "pid", "pattr"],
1375                [tb1.c.name, tb1.c.id, tb1.c.attr],
1376                name="fk_tb1_name_id_attr",
1377            ),
1378            schema=None,
1379            test_needs_fk=True,
1380        )
1381
1382    @testing.requires.primary_key_constraint_reflection
1383    @testing.provide_metadata
1384    def test_pk_column_order(self):
1385        # test for issue #5661
1386        meta = self.metadata
1387        insp = inspect(meta.bind)
1388        primary_key = insp.get_pk_constraint(self.tables.tb1.name)
1389        eq_(primary_key.get("constrained_columns"), ["name", "id", "attr"])
1390
1391    @testing.requires.foreign_key_constraint_reflection
1392    @testing.provide_metadata
1393    def test_fk_column_order(self):
1394        # test for issue #5661
1395        meta = self.metadata
1396        insp = inspect(meta.bind)
1397        foreign_keys = insp.get_foreign_keys(self.tables.tb2.name)
1398        eq_(len(foreign_keys), 1)
1399        fkey1 = foreign_keys[0]
1400        eq_(fkey1.get("referred_columns"), ["name", "id", "attr"])
1401        eq_(fkey1.get("constrained_columns"), ["pname", "pid", "pattr"])
1402
1403
1404__all__ = (
1405    "ComponentReflectionTest",
1406    "QuotedNameArgumentTest",
1407    "HasTableTest",
1408    "NormalizedNameTest",
1409    "ComputedReflectionTest",
1410    "CompositeKeyReflectionTest",
1411)
1412