1import operator
2import re
3
4import sqlalchemy as sa
5from .. import assert_raises_message
6from .. import config
7from .. import engines
8from .. import eq_
9from .. import expect_warnings
10from .. import fixtures
11from .. import is_
12from ..schema import Column
13from ..schema import Table
14from ... import event
15from ... import exc as sa_exc
16from ... import ForeignKey
17from ... import inspect
18from ... import Integer
19from ... import MetaData
20from ... import String
21from ... import testing
22from ... import types as sql_types
23from ...engine.reflection import Inspector
24from ...schema import DDL
25from ...schema import Index
26from ...sql.elements import quoted_name
27
28
29metadata, users = None, None
30
31
32class HasTableTest(fixtures.TablesTest):
33    __backend__ = True
34
35    @classmethod
36    def define_tables(cls, metadata):
37        Table(
38            "test_table",
39            metadata,
40            Column("id", Integer, primary_key=True),
41            Column("data", String(50)),
42        )
43
44    def test_has_table(self):
45        with config.db.begin() as conn:
46            assert config.db.dialect.has_table(conn, "test_table")
47            assert not config.db.dialect.has_table(conn, "nonexistent_table")
48
49
50class ComponentReflectionTest(fixtures.TablesTest):
51    run_inserts = run_deletes = None
52
53    __backend__ = True
54
55    @classmethod
56    def setup_bind(cls):
57        if config.requirements.independent_connections.enabled:
58            from sqlalchemy import pool
59
60            return engines.testing_engine(
61                options=dict(poolclass=pool.StaticPool)
62            )
63        else:
64            return config.db
65
66    @classmethod
67    def define_tables(cls, metadata):
68        cls.define_reflected_tables(metadata, None)
69        if testing.requires.schemas.enabled:
70            cls.define_reflected_tables(metadata, testing.config.test_schema)
71
72    @classmethod
73    def define_reflected_tables(cls, metadata, schema):
74        if schema:
75            schema_prefix = schema + "."
76        else:
77            schema_prefix = ""
78
79        if testing.requires.self_referential_foreign_keys.enabled:
80            users = Table(
81                "users",
82                metadata,
83                Column("user_id", sa.INT, primary_key=True),
84                Column("test1", sa.CHAR(5), nullable=False),
85                Column("test2", sa.Float(5), nullable=False),
86                Column(
87                    "parent_user_id",
88                    sa.Integer,
89                    sa.ForeignKey(
90                        "%susers.user_id" % schema_prefix, name="user_id_fk"
91                    ),
92                ),
93                schema=schema,
94                test_needs_fk=True,
95            )
96        else:
97            users = Table(
98                "users",
99                metadata,
100                Column("user_id", sa.INT, primary_key=True),
101                Column("test1", sa.CHAR(5), nullable=False),
102                Column("test2", sa.Float(5), nullable=False),
103                schema=schema,
104                test_needs_fk=True,
105            )
106
107        Table(
108            "dingalings",
109            metadata,
110            Column("dingaling_id", sa.Integer, primary_key=True),
111            Column(
112                "address_id",
113                sa.Integer,
114                sa.ForeignKey("%semail_addresses.address_id" % schema_prefix),
115            ),
116            Column("data", sa.String(30)),
117            schema=schema,
118            test_needs_fk=True,
119        )
120        Table(
121            "email_addresses",
122            metadata,
123            Column("address_id", sa.Integer),
124            Column(
125                "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id)
126            ),
127            Column("email_address", sa.String(20)),
128            sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"),
129            schema=schema,
130            test_needs_fk=True,
131        )
132        Table(
133            "comment_test",
134            metadata,
135            Column("id", sa.Integer, primary_key=True, comment="id comment"),
136            Column("data", sa.String(20), comment="data % comment"),
137            Column(
138                "d2",
139                sa.String(20),
140                comment=r"""Comment types type speedily ' " \ '' Fun!""",
141            ),
142            schema=schema,
143            comment=r"""the test % ' " \ table comment""",
144        )
145
146        if testing.requires.cross_schema_fk_reflection.enabled:
147            if schema is None:
148                Table(
149                    "local_table",
150                    metadata,
151                    Column("id", sa.Integer, primary_key=True),
152                    Column("data", sa.String(20)),
153                    Column(
154                        "remote_id",
155                        ForeignKey(
156                            "%s.remote_table_2.id" % testing.config.test_schema
157                        ),
158                    ),
159                    test_needs_fk=True,
160                    schema=config.db.dialect.default_schema_name,
161                )
162            else:
163                Table(
164                    "remote_table",
165                    metadata,
166                    Column("id", sa.Integer, primary_key=True),
167                    Column(
168                        "local_id",
169                        ForeignKey(
170                            "%s.local_table.id"
171                            % config.db.dialect.default_schema_name
172                        ),
173                    ),
174                    Column("data", sa.String(20)),
175                    schema=schema,
176                    test_needs_fk=True,
177                )
178                Table(
179                    "remote_table_2",
180                    metadata,
181                    Column("id", sa.Integer, primary_key=True),
182                    Column("data", sa.String(20)),
183                    schema=schema,
184                    test_needs_fk=True,
185                )
186
187        if testing.requires.index_reflection.enabled:
188            cls.define_index(metadata, users)
189
190            if not schema:
191                # test_needs_fk is at the moment to force MySQL InnoDB
192                noncol_idx_test_nopk = Table(
193                    "noncol_idx_test_nopk",
194                    metadata,
195                    Column("q", sa.String(5)),
196                    test_needs_fk=True,
197                )
198
199                noncol_idx_test_pk = Table(
200                    "noncol_idx_test_pk",
201                    metadata,
202                    Column("id", sa.Integer, primary_key=True),
203                    Column("q", sa.String(5)),
204                    test_needs_fk=True,
205                )
206                Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc())
207                Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc())
208
209        if testing.requires.view_column_reflection.enabled:
210            cls.define_views(metadata, schema)
211        if not schema and testing.requires.temp_table_reflection.enabled:
212            cls.define_temp_tables(metadata)
213
214    @classmethod
215    def define_temp_tables(cls, metadata):
216        # cheat a bit, we should fix this with some dialect-level
217        # temp table fixture
218        if testing.against("oracle"):
219            kw = {
220                "prefixes": ["GLOBAL TEMPORARY"],
221                "oracle_on_commit": "PRESERVE ROWS",
222            }
223        else:
224            kw = {"prefixes": ["TEMPORARY"]}
225
226        user_tmp = Table(
227            "user_tmp",
228            metadata,
229            Column("id", sa.INT, primary_key=True),
230            Column("name", sa.VARCHAR(50)),
231            Column("foo", sa.INT),
232            sa.UniqueConstraint("name", name="user_tmp_uq"),
233            sa.Index("user_tmp_ix", "foo"),
234            **kw
235        )
236        if (
237            testing.requires.view_reflection.enabled
238            and testing.requires.temporary_views.enabled
239        ):
240            event.listen(
241                user_tmp,
242                "after_create",
243                DDL(
244                    "create temporary view user_tmp_v as "
245                    "select * from user_tmp"
246                ),
247            )
248            event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
249
250    @classmethod
251    def define_index(cls, metadata, users):
252        Index("users_t_idx", users.c.test1, users.c.test2)
253        Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
254
255    @classmethod
256    def define_views(cls, metadata, schema):
257        for table_name in ("users", "email_addresses"):
258            fullname = table_name
259            if schema:
260                fullname = "%s.%s" % (schema, table_name)
261            view_name = fullname + "_v"
262            query = "CREATE VIEW %s AS SELECT * FROM %s" % (
263                view_name,
264                fullname,
265            )
266
267            event.listen(metadata, "after_create", DDL(query))
268            event.listen(
269                metadata, "before_drop", DDL("DROP VIEW %s" % view_name)
270            )
271
272    @testing.requires.schema_reflection
273    def test_get_schema_names(self):
274        insp = inspect(testing.db)
275
276        self.assert_(testing.config.test_schema in insp.get_schema_names())
277
278    @testing.requires.schema_reflection
279    def test_dialect_initialize(self):
280        engine = engines.testing_engine()
281        assert not hasattr(engine.dialect, "default_schema_name")
282        inspect(engine)
283        assert hasattr(engine.dialect, "default_schema_name")
284
285    @testing.requires.schema_reflection
286    def test_get_default_schema_name(self):
287        insp = inspect(testing.db)
288        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
289
290    @testing.provide_metadata
291    def _test_get_table_names(
292        self, schema=None, table_type="table", order_by=None
293    ):
294        _ignore_tables = [
295            "comment_test",
296            "noncol_idx_test_pk",
297            "noncol_idx_test_nopk",
298            "local_table",
299            "remote_table",
300            "remote_table_2",
301        ]
302        meta = self.metadata
303        users, addresses, dingalings = (
304            self.tables.users,
305            self.tables.email_addresses,
306            self.tables.dingalings,
307        )
308        insp = inspect(meta.bind)
309
310        if table_type == "view":
311            table_names = insp.get_view_names(schema)
312            table_names.sort()
313            answer = ["email_addresses_v", "users_v"]
314            eq_(sorted(table_names), answer)
315        else:
316            table_names = [
317                t
318                for t in insp.get_table_names(schema, order_by=order_by)
319                if t not in _ignore_tables
320            ]
321
322            if order_by == "foreign_key":
323                answer = ["users", "email_addresses", "dingalings"]
324                eq_(table_names, answer)
325            else:
326                answer = ["dingalings", "email_addresses", "users"]
327                eq_(sorted(table_names), answer)
328
329    @testing.requires.temp_table_names
330    def test_get_temp_table_names(self):
331        insp = inspect(self.bind)
332        temp_table_names = insp.get_temp_table_names()
333        eq_(sorted(temp_table_names), ["user_tmp"])
334
335    @testing.requires.view_reflection
336    @testing.requires.temp_table_names
337    @testing.requires.temporary_views
338    def test_get_temp_view_names(self):
339        insp = inspect(self.bind)
340        temp_table_names = insp.get_temp_view_names()
341        eq_(sorted(temp_table_names), ["user_tmp_v"])
342
343    @testing.requires.table_reflection
344    def test_get_table_names(self):
345        self._test_get_table_names()
346
347    @testing.requires.table_reflection
348    @testing.requires.foreign_key_constraint_reflection
349    def test_get_table_names_fks(self):
350        self._test_get_table_names(order_by="foreign_key")
351
352    @testing.requires.comment_reflection
353    def test_get_comments(self):
354        self._test_get_comments()
355
356    @testing.requires.comment_reflection
357    @testing.requires.schemas
358    def test_get_comments_with_schema(self):
359        self._test_get_comments(testing.config.test_schema)
360
361    def _test_get_comments(self, schema=None):
362        insp = inspect(testing.db)
363
364        eq_(
365            insp.get_table_comment("comment_test", schema=schema),
366            {"text": r"""the test % ' " \ table comment"""},
367        )
368
369        eq_(insp.get_table_comment("users", schema=schema), {"text": None})
370
371        eq_(
372            [
373                {"name": rec["name"], "comment": rec["comment"]}
374                for rec in insp.get_columns("comment_test", schema=schema)
375            ],
376            [
377                {"comment": "id comment", "name": "id"},
378                {"comment": "data % comment", "name": "data"},
379                {
380                    "comment": (
381                        r"""Comment types type speedily ' " \ '' Fun!"""
382                    ),
383                    "name": "d2",
384                },
385            ],
386        )
387
388    @testing.requires.table_reflection
389    @testing.requires.schemas
390    def test_get_table_names_with_schema(self):
391        self._test_get_table_names(testing.config.test_schema)
392
393    @testing.requires.view_column_reflection
394    def test_get_view_names(self):
395        self._test_get_table_names(table_type="view")
396
397    @testing.requires.view_column_reflection
398    @testing.requires.schemas
399    def test_get_view_names_with_schema(self):
400        self._test_get_table_names(
401            testing.config.test_schema, table_type="view"
402        )
403
404    @testing.requires.table_reflection
405    @testing.requires.view_column_reflection
406    def test_get_tables_and_views(self):
407        self._test_get_table_names()
408        self._test_get_table_names(table_type="view")
409
410    def _test_get_columns(self, schema=None, table_type="table"):
411        meta = MetaData(testing.db)
412        users, addresses, dingalings = (
413            self.tables.users,
414            self.tables.email_addresses,
415            self.tables.dingalings,
416        )
417        table_names = ["users", "email_addresses"]
418        if table_type == "view":
419            table_names = ["users_v", "email_addresses_v"]
420        insp = inspect(meta.bind)
421        for table_name, table in zip(table_names, (users, addresses)):
422            schema_name = schema
423            cols = insp.get_columns(table_name, schema=schema_name)
424            self.assert_(len(cols) > 0, len(cols))
425
426            # should be in order
427
428            for i, col in enumerate(table.columns):
429                eq_(col.name, cols[i]["name"])
430                ctype = cols[i]["type"].__class__
431                ctype_def = col.type
432                if isinstance(ctype_def, sa.types.TypeEngine):
433                    ctype_def = ctype_def.__class__
434
435                # Oracle returns Date for DateTime.
436
437                if testing.against("oracle") and ctype_def in (
438                    sql_types.Date,
439                    sql_types.DateTime,
440                ):
441                    ctype_def = sql_types.Date
442
443                # assert that the desired type and return type share
444                # a base within one of the generic types.
445
446                self.assert_(
447                    len(
448                        set(ctype.__mro__)
449                        .intersection(ctype_def.__mro__)
450                        .intersection(
451                            [
452                                sql_types.Integer,
453                                sql_types.Numeric,
454                                sql_types.DateTime,
455                                sql_types.Date,
456                                sql_types.Time,
457                                sql_types.String,
458                                sql_types._Binary,
459                            ]
460                        )
461                    )
462                    > 0,
463                    "%s(%s), %s(%s)"
464                    % (col.name, col.type, cols[i]["name"], ctype),
465                )
466
467                if not col.primary_key:
468                    assert cols[i]["default"] is None
469
470    @testing.requires.table_reflection
471    def test_get_columns(self):
472        self._test_get_columns()
473
474    @testing.provide_metadata
475    def _type_round_trip(self, *types):
476        t = Table(
477            "t",
478            self.metadata,
479            *[Column("t%d" % i, type_) for i, type_ in enumerate(types)]
480        )
481        t.create()
482
483        return [
484            c["type"] for c in inspect(self.metadata.bind).get_columns("t")
485        ]
486
487    @testing.requires.table_reflection
488    def test_numeric_reflection(self):
489        for typ in self._type_round_trip(sql_types.Numeric(18, 5)):
490            assert isinstance(typ, sql_types.Numeric)
491            eq_(typ.precision, 18)
492            eq_(typ.scale, 5)
493
494    @testing.requires.table_reflection
495    def test_varchar_reflection(self):
496        typ = self._type_round_trip(sql_types.String(52))[0]
497        assert isinstance(typ, sql_types.String)
498        eq_(typ.length, 52)
499
500    @testing.requires.table_reflection
501    @testing.provide_metadata
502    def test_nullable_reflection(self):
503        t = Table(
504            "t",
505            self.metadata,
506            Column("a", Integer, nullable=True),
507            Column("b", Integer, nullable=False),
508        )
509        t.create()
510        eq_(
511            dict(
512                (col["name"], col["nullable"])
513                for col in inspect(self.metadata.bind).get_columns("t")
514            ),
515            {"a": True, "b": False},
516        )
517
518    @testing.requires.table_reflection
519    @testing.requires.schemas
520    def test_get_columns_with_schema(self):
521        self._test_get_columns(schema=testing.config.test_schema)
522
523    @testing.requires.temp_table_reflection
524    def test_get_temp_table_columns(self):
525        meta = MetaData(self.bind)
526        user_tmp = self.tables.user_tmp
527        insp = inspect(meta.bind)
528        cols = insp.get_columns("user_tmp")
529        self.assert_(len(cols) > 0, len(cols))
530
531        for i, col in enumerate(user_tmp.columns):
532            eq_(col.name, cols[i]["name"])
533
534    @testing.requires.temp_table_reflection
535    @testing.requires.view_column_reflection
536    @testing.requires.temporary_views
537    def test_get_temp_view_columns(self):
538        insp = inspect(self.bind)
539        cols = insp.get_columns("user_tmp_v")
540        eq_([col["name"] for col in cols], ["id", "name", "foo"])
541
542    @testing.requires.view_column_reflection
543    def test_get_view_columns(self):
544        self._test_get_columns(table_type="view")
545
546    @testing.requires.view_column_reflection
547    @testing.requires.schemas
548    def test_get_view_columns_with_schema(self):
549        self._test_get_columns(
550            schema=testing.config.test_schema, table_type="view"
551        )
552
553    @testing.provide_metadata
554    def _test_get_pk_constraint(self, schema=None):
555        meta = self.metadata
556        users, addresses = self.tables.users, self.tables.email_addresses
557        insp = inspect(meta.bind)
558
559        users_cons = insp.get_pk_constraint(users.name, schema=schema)
560        users_pkeys = users_cons["constrained_columns"]
561        eq_(users_pkeys, ["user_id"])
562
563        addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
564        addr_pkeys = addr_cons["constrained_columns"]
565        eq_(addr_pkeys, ["address_id"])
566
567        with testing.requires.reflects_pk_names.fail_if():
568            eq_(addr_cons["name"], "email_ad_pk")
569
570    @testing.requires.primary_key_constraint_reflection
571    def test_get_pk_constraint(self):
572        self._test_get_pk_constraint()
573
574    @testing.requires.table_reflection
575    @testing.requires.primary_key_constraint_reflection
576    @testing.requires.schemas
577    def test_get_pk_constraint_with_schema(self):
578        self._test_get_pk_constraint(schema=testing.config.test_schema)
579
580    @testing.requires.table_reflection
581    @testing.provide_metadata
582    def test_deprecated_get_primary_keys(self):
583        meta = self.metadata
584        users = self.tables.users
585        insp = Inspector(meta.bind)
586        assert_raises_message(
587            sa_exc.SADeprecationWarning,
588            r".*get_primary_keys\(\) method is deprecated",
589            insp.get_primary_keys,
590            users.name,
591        )
592
593    @testing.provide_metadata
594    def _test_get_foreign_keys(self, schema=None):
595        meta = self.metadata
596        users, addresses, dingalings = (
597            self.tables.users,
598            self.tables.email_addresses,
599            self.tables.dingalings,
600        )
601        insp = inspect(meta.bind)
602        expected_schema = schema
603        # users
604
605        if testing.requires.self_referential_foreign_keys.enabled:
606            users_fkeys = insp.get_foreign_keys(users.name, schema=schema)
607            fkey1 = users_fkeys[0]
608
609            with testing.requires.named_constraints.fail_if():
610                eq_(fkey1["name"], "user_id_fk")
611
612            eq_(fkey1["referred_schema"], expected_schema)
613            eq_(fkey1["referred_table"], users.name)
614            eq_(fkey1["referred_columns"], ["user_id"])
615            if testing.requires.self_referential_foreign_keys.enabled:
616                eq_(fkey1["constrained_columns"], ["parent_user_id"])
617
618        # addresses
619        addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema)
620        fkey1 = addr_fkeys[0]
621
622        with testing.requires.implicitly_named_constraints.fail_if():
623            self.assert_(fkey1["name"] is not None)
624
625        eq_(fkey1["referred_schema"], expected_schema)
626        eq_(fkey1["referred_table"], users.name)
627        eq_(fkey1["referred_columns"], ["user_id"])
628        eq_(fkey1["constrained_columns"], ["remote_user_id"])
629
630    @testing.requires.foreign_key_constraint_reflection
631    def test_get_foreign_keys(self):
632        self._test_get_foreign_keys()
633
634    @testing.requires.foreign_key_constraint_reflection
635    @testing.requires.schemas
636    def test_get_foreign_keys_with_schema(self):
637        self._test_get_foreign_keys(schema=testing.config.test_schema)
638
639    @testing.requires.cross_schema_fk_reflection
640    @testing.requires.schemas
641    def test_get_inter_schema_foreign_keys(self):
642        local_table, remote_table, remote_table_2 = self.tables(
643            "%s.local_table" % testing.db.dialect.default_schema_name,
644            "%s.remote_table" % testing.config.test_schema,
645            "%s.remote_table_2" % testing.config.test_schema,
646        )
647
648        insp = inspect(config.db)
649
650        local_fkeys = insp.get_foreign_keys(local_table.name)
651        eq_(len(local_fkeys), 1)
652
653        fkey1 = local_fkeys[0]
654        eq_(fkey1["referred_schema"], testing.config.test_schema)
655        eq_(fkey1["referred_table"], remote_table_2.name)
656        eq_(fkey1["referred_columns"], ["id"])
657        eq_(fkey1["constrained_columns"], ["remote_id"])
658
659        remote_fkeys = insp.get_foreign_keys(
660            remote_table.name, schema=testing.config.test_schema
661        )
662        eq_(len(remote_fkeys), 1)
663
664        fkey2 = remote_fkeys[0]
665
666        assert fkey2["referred_schema"] in (
667            None,
668            testing.db.dialect.default_schema_name,
669        )
670        eq_(fkey2["referred_table"], local_table.name)
671        eq_(fkey2["referred_columns"], ["id"])
672        eq_(fkey2["constrained_columns"], ["local_id"])
673
674    @testing.requires.foreign_key_constraint_option_reflection_ondelete
675    def test_get_foreign_key_options_ondelete(self):
676        self._test_get_foreign_key_options(ondelete="CASCADE")
677
678    @testing.requires.foreign_key_constraint_option_reflection_onupdate
679    def test_get_foreign_key_options_onupdate(self):
680        self._test_get_foreign_key_options(onupdate="SET NULL")
681
682    @testing.provide_metadata
683    def _test_get_foreign_key_options(self, **options):
684        meta = self.metadata
685
686        Table(
687            "x",
688            meta,
689            Column("id", Integer, primary_key=True),
690            test_needs_fk=True,
691        )
692
693        Table(
694            "table",
695            meta,
696            Column("id", Integer, primary_key=True),
697            Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")),
698            Column("test", String(10)),
699            test_needs_fk=True,
700        )
701
702        Table(
703            "user",
704            meta,
705            Column("id", Integer, primary_key=True),
706            Column("name", String(50), nullable=False),
707            Column("tid", Integer),
708            sa.ForeignKeyConstraint(
709                ["tid"], ["table.id"], name="myfk", **options
710            ),
711            test_needs_fk=True,
712        )
713
714        meta.create_all()
715
716        insp = inspect(meta.bind)
717
718        # test 'options' is always present for a backend
719        # that can reflect these, since alembic looks for this
720        opts = insp.get_foreign_keys("table")[0]["options"]
721
722        eq_(dict((k, opts[k]) for k in opts if opts[k]), {})
723
724        opts = insp.get_foreign_keys("user")[0]["options"]
725        eq_(dict((k, opts[k]) for k in opts if opts[k]), options)
726
727    def _assert_insp_indexes(self, indexes, expected_indexes):
728        index_names = [d["name"] for d in indexes]
729        for e_index in expected_indexes:
730            assert e_index["name"] in index_names
731            index = indexes[index_names.index(e_index["name"])]
732            for key in e_index:
733                eq_(e_index[key], index[key])
734
735    @testing.provide_metadata
736    def _test_get_indexes(self, schema=None):
737        meta = self.metadata
738        users, addresses, dingalings = (
739            self.tables.users,
740            self.tables.email_addresses,
741            self.tables.dingalings,
742        )
743        # The database may decide to create indexes for foreign keys, etc.
744        # so there may be more indexes than expected.
745        insp = inspect(meta.bind)
746        indexes = insp.get_indexes("users", schema=schema)
747        expected_indexes = [
748            {
749                "unique": False,
750                "column_names": ["test1", "test2"],
751                "name": "users_t_idx",
752            },
753            {
754                "unique": False,
755                "column_names": ["user_id", "test2", "test1"],
756                "name": "users_all_idx",
757            },
758        ]
759        self._assert_insp_indexes(indexes, expected_indexes)
760
761    @testing.requires.index_reflection
762    def test_get_indexes(self):
763        self._test_get_indexes()
764
765    @testing.requires.index_reflection
766    @testing.requires.schemas
767    def test_get_indexes_with_schema(self):
768        self._test_get_indexes(schema=testing.config.test_schema)
769
770    @testing.provide_metadata
771    def _test_get_noncol_index(self, tname, ixname):
772        meta = self.metadata
773        insp = inspect(meta.bind)
774        indexes = insp.get_indexes(tname)
775
776        # reflecting an index that has "x DESC" in it as the column.
777        # the DB may or may not give us "x", but make sure we get the index
778        # back, it has a name, it's connected to the table.
779        expected_indexes = [{"unique": False, "name": ixname}]
780        self._assert_insp_indexes(indexes, expected_indexes)
781
782        t = Table(tname, meta, autoload_with=meta.bind)
783        eq_(len(t.indexes), 1)
784        is_(list(t.indexes)[0].table, t)
785        eq_(list(t.indexes)[0].name, ixname)
786
787    @testing.requires.index_reflection
788    def test_get_noncol_index_no_pk(self):
789        self._test_get_noncol_index("noncol_idx_test_nopk", "noncol_idx_nopk")
790
791    @testing.requires.index_reflection
792    def test_get_noncol_index_pk(self):
793        self._test_get_noncol_index("noncol_idx_test_pk", "noncol_idx_pk")
794
795    @testing.requires.indexes_with_expressions
796    @testing.provide_metadata
797    def test_reflect_expression_based_indexes(self):
798        Table(
799            "t",
800            self.metadata,
801            Column("x", String(30)),
802            Column("y", String(30)),
803        )
804        event.listen(
805            self.metadata,
806            "after_create",
807            DDL("CREATE INDEX t_idx ON t(lower(x), lower(y))"),
808        )
809        event.listen(
810            self.metadata, "after_create", DDL("CREATE INDEX t_idx_2 ON t(x)")
811        )
812        self.metadata.create_all()
813
814        insp = inspect(self.metadata.bind)
815
816        with expect_warnings(
817            "Skipped unsupported reflection of expression-based index t_idx"
818        ):
819            eq_(
820                insp.get_indexes("t"),
821                [{"name": "t_idx_2", "column_names": ["x"], "unique": 0}],
822            )
823
824    @testing.requires.unique_constraint_reflection
825    def test_get_unique_constraints(self):
826        self._test_get_unique_constraints()
827
828    @testing.requires.temp_table_reflection
829    @testing.requires.unique_constraint_reflection
830    def test_get_temp_table_unique_constraints(self):
831        insp = inspect(self.bind)
832        reflected = insp.get_unique_constraints("user_tmp")
833        for refl in reflected:
834            # Different dialects handle duplicate index and constraints
835            # differently, so ignore this flag
836            refl.pop("duplicates_index", None)
837        eq_(reflected, [{"column_names": ["name"], "name": "user_tmp_uq"}])
838
839    @testing.requires.temp_table_reflection
840    def test_get_temp_table_indexes(self):
841        insp = inspect(self.bind)
842        indexes = insp.get_indexes("user_tmp")
843        for ind in indexes:
844            ind.pop("dialect_options", None)
845        eq_(
846            # TODO: we need to add better filtering for indexes/uq constraints
847            # that are doubled up
848            [idx for idx in indexes if idx["name"] == "user_tmp_ix"],
849            [
850                {
851                    "unique": False,
852                    "column_names": ["foo"],
853                    "name": "user_tmp_ix",
854                }
855            ],
856        )
857
858    @testing.requires.unique_constraint_reflection
859    @testing.requires.schemas
860    def test_get_unique_constraints_with_schema(self):
861        self._test_get_unique_constraints(schema=testing.config.test_schema)
862
863    @testing.provide_metadata
864    def _test_get_unique_constraints(self, schema=None):
865        # SQLite dialect needs to parse the names of the constraints
866        # separately from what it gets from PRAGMA index_list(), and
867        # then matches them up.  so same set of column_names in two
868        # constraints will confuse it.    Perhaps we should no longer
869        # bother with index_list() here since we have the whole
870        # CREATE TABLE?
871        uniques = sorted(
872            [
873                {"name": "unique_a", "column_names": ["a"]},
874                {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]},
875                {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]},
876                {"name": "unique_asc_key", "column_names": ["asc", "key"]},
877                {"name": "i.have.dots", "column_names": ["b"]},
878                {"name": "i have spaces", "column_names": ["c"]},
879            ],
880            key=operator.itemgetter("name"),
881        )
882        orig_meta = self.metadata
883        table = Table(
884            "testtbl",
885            orig_meta,
886            Column("a", sa.String(20)),
887            Column("b", sa.String(30)),
888            Column("c", sa.Integer),
889            # reserved identifiers
890            Column("asc", sa.String(30)),
891            Column("key", sa.String(30)),
892            schema=schema,
893        )
894        for uc in uniques:
895            table.append_constraint(
896                sa.UniqueConstraint(*uc["column_names"], name=uc["name"])
897            )
898        orig_meta.create_all()
899
900        inspector = inspect(orig_meta.bind)
901        reflected = sorted(
902            inspector.get_unique_constraints("testtbl", schema=schema),
903            key=operator.itemgetter("name"),
904        )
905
906        names_that_duplicate_index = set()
907
908        for orig, refl in zip(uniques, reflected):
909            # Different dialects handle duplicate index and constraints
910            # differently, so ignore this flag
911            dupe = refl.pop("duplicates_index", None)
912            if dupe:
913                names_that_duplicate_index.add(dupe)
914            eq_(orig, refl)
915
916        reflected_metadata = MetaData()
917        reflected = Table(
918            "testtbl",
919            reflected_metadata,
920            autoload_with=orig_meta.bind,
921            schema=schema,
922        )
923
924        # test "deduplicates for index" logic.   MySQL and Oracle
925        # "unique constraints" are actually unique indexes (with possible
926        # exception of a unique that is a dupe of another one in the case
927        # of Oracle).  make sure # they aren't duplicated.
928        idx_names = set([idx.name for idx in reflected.indexes])
929        uq_names = set(
930            [
931                uq.name
932                for uq in reflected.constraints
933                if isinstance(uq, sa.UniqueConstraint)
934            ]
935        ).difference(["unique_c_a_b"])
936
937        assert not idx_names.intersection(uq_names)
938        if names_that_duplicate_index:
939            eq_(names_that_duplicate_index, idx_names)
940            eq_(uq_names, set())
941
942    @testing.requires.check_constraint_reflection
943    def test_get_check_constraints(self):
944        self._test_get_check_constraints()
945
946    @testing.requires.check_constraint_reflection
947    @testing.requires.schemas
948    def test_get_check_constraints_schema(self):
949        self._test_get_check_constraints(schema=testing.config.test_schema)
950
951    @testing.provide_metadata
952    def _test_get_check_constraints(self, schema=None):
953        orig_meta = self.metadata
954        Table(
955            "sa_cc",
956            orig_meta,
957            Column("a", Integer()),
958            sa.CheckConstraint("a > 1 AND a < 5", name="cc1"),
959            sa.CheckConstraint("a = 1 OR (a > 2 AND a < 5)", name="cc2"),
960            schema=schema,
961        )
962
963        orig_meta.create_all()
964
965        inspector = inspect(orig_meta.bind)
966        reflected = sorted(
967            inspector.get_check_constraints("sa_cc", schema=schema),
968            key=operator.itemgetter("name"),
969        )
970
971        # trying to minimize effect of quoting, parenthesis, etc.
972        # may need to add more to this as new dialects get CHECK
973        # constraint reflection support
974        def normalize(sqltext):
975            return " ".join(
976                re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I)
977            )
978
979        reflected = [
980            {"name": item["name"], "sqltext": normalize(item["sqltext"])}
981            for item in reflected
982        ]
983        eq_(
984            reflected,
985            [
986                {"name": "cc1", "sqltext": "a > 1 and a < 5"},
987                {"name": "cc2", "sqltext": "a = 1 or a > 2 and a < 5"},
988            ],
989        )
990
991    @testing.provide_metadata
992    def _test_get_view_definition(self, schema=None):
993        meta = self.metadata
994        users, addresses, dingalings = (
995            self.tables.users,
996            self.tables.email_addresses,
997            self.tables.dingalings,
998        )
999        view_name1 = "users_v"
1000        view_name2 = "email_addresses_v"
1001        insp = inspect(meta.bind)
1002        v1 = insp.get_view_definition(view_name1, schema=schema)
1003        self.assert_(v1)
1004        v2 = insp.get_view_definition(view_name2, schema=schema)
1005        self.assert_(v2)
1006
1007    @testing.requires.view_reflection
1008    def test_get_view_definition(self):
1009        self._test_get_view_definition()
1010
1011    @testing.requires.view_reflection
1012    @testing.requires.schemas
1013    def test_get_view_definition_with_schema(self):
1014        self._test_get_view_definition(schema=testing.config.test_schema)
1015
1016    @testing.only_on("postgresql", "PG specific feature")
1017    @testing.provide_metadata
1018    def _test_get_table_oid(self, table_name, schema=None):
1019        meta = self.metadata
1020        users, addresses, dingalings = (
1021            self.tables.users,
1022            self.tables.email_addresses,
1023            self.tables.dingalings,
1024        )
1025        insp = inspect(meta.bind)
1026        oid = insp.get_table_oid(table_name, schema)
1027        self.assert_(isinstance(oid, int))
1028
1029    def test_get_table_oid(self):
1030        self._test_get_table_oid("users")
1031
1032    @testing.requires.schemas
1033    def test_get_table_oid_with_schema(self):
1034        self._test_get_table_oid("users", schema=testing.config.test_schema)
1035
1036    @testing.requires.table_reflection
1037    @testing.provide_metadata
1038    def test_autoincrement_col(self):
1039        """test that 'autoincrement' is reflected according to sqla's policy.
1040
1041        Don't mark this test as unsupported for any backend !
1042
1043        (technically it fails with MySQL InnoDB since "id" comes before "id2")
1044
1045        A backend is better off not returning "autoincrement" at all,
1046        instead of potentially returning "False" for an auto-incrementing
1047        primary key column.
1048
1049        """
1050
1051        meta = self.metadata
1052        insp = inspect(meta.bind)
1053
1054        for tname, cname in [
1055            ("users", "user_id"),
1056            ("email_addresses", "address_id"),
1057            ("dingalings", "dingaling_id"),
1058        ]:
1059            cols = insp.get_columns(tname)
1060            id_ = {c["name"]: c for c in cols}[cname]
1061            assert id_.get("autoincrement", True)
1062
1063
1064class NormalizedNameTest(fixtures.TablesTest):
1065    __requires__ = ("denormalized_names",)
1066    __backend__ = True
1067
1068    @classmethod
1069    def define_tables(cls, metadata):
1070        Table(
1071            quoted_name("t1", quote=True),
1072            metadata,
1073            Column("id", Integer, primary_key=True),
1074        )
1075        Table(
1076            quoted_name("t2", quote=True),
1077            metadata,
1078            Column("id", Integer, primary_key=True),
1079            Column("t1id", ForeignKey("t1.id")),
1080        )
1081
1082    def test_reflect_lowercase_forced_tables(self):
1083
1084        m2 = MetaData(testing.db)
1085        t2_ref = Table(quoted_name("t2", quote=True), m2, autoload=True)
1086        t1_ref = m2.tables["t1"]
1087        assert t2_ref.c.t1id.references(t1_ref.c.id)
1088
1089        m3 = MetaData(testing.db)
1090        m3.reflect(only=lambda name, m: name.lower() in ("t1", "t2"))
1091        assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id)
1092
1093    def test_get_table_names(self):
1094        tablenames = [
1095            t
1096            for t in inspect(testing.db).get_table_names()
1097            if t.lower() in ("t1", "t2")
1098        ]
1099
1100        eq_(tablenames[0].upper(), tablenames[0].lower())
1101        eq_(tablenames[1].upper(), tablenames[1].lower())
1102
1103
1104__all__ = ("ComponentReflectionTest", "HasTableTest", "NormalizedNameTest")
1105