1# coding: utf-8
2
3
4from sqlalchemy import exc
5from sqlalchemy import FLOAT
6from sqlalchemy import ForeignKey
7from sqlalchemy import ForeignKeyConstraint
8from sqlalchemy import func
9from sqlalchemy import Identity
10from sqlalchemy import Index
11from sqlalchemy import inspect
12from sqlalchemy import INTEGER
13from sqlalchemy import Integer
14from sqlalchemy import MetaData
15from sqlalchemy import Numeric
16from sqlalchemy import PrimaryKeyConstraint
17from sqlalchemy import select
18from sqlalchemy import testing
19from sqlalchemy import text
20from sqlalchemy import Unicode
21from sqlalchemy import UniqueConstraint
22from sqlalchemy.dialects.oracle.base import BINARY_DOUBLE
23from sqlalchemy.dialects.oracle.base import BINARY_FLOAT
24from sqlalchemy.dialects.oracle.base import DOUBLE_PRECISION
25from sqlalchemy.dialects.oracle.base import NUMBER
26from sqlalchemy.testing import assert_raises
27from sqlalchemy.testing import AssertsCompiledSQL
28from sqlalchemy.testing import eq_
29from sqlalchemy.testing import fixtures
30from sqlalchemy.testing import is_
31from sqlalchemy.testing import is_true
32from sqlalchemy.testing.engines import testing_engine
33from sqlalchemy.testing.schema import Column
34from sqlalchemy.testing.schema import Table
35
36
37class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
38    __only_on__ = "oracle"
39    __backend__ = True
40
41    @classmethod
42    def setup_test_class(cls):
43        # currently assuming full DBA privs for the user.
44        # don't really know how else to go here unless
45        # we connect as the other user.
46
47        with testing.db.begin() as conn:
48            for stmt in (
49                """
50    create table %(test_schema)s.parent(
51        id integer primary key,
52        data varchar2(50)
53    );
54
55    COMMENT ON TABLE %(test_schema)s.parent IS 'my table comment';
56
57    create table %(test_schema)s.child(
58        id integer primary key,
59        data varchar2(50),
60        parent_id integer references %(test_schema)s.parent(id)
61    );
62
63    create table local_table(
64        id integer primary key,
65        data varchar2(50)
66    );
67
68    create synonym %(test_schema)s.ptable for %(test_schema)s.parent;
69    create synonym %(test_schema)s.ctable for %(test_schema)s.child;
70
71    create synonym %(test_schema)s_pt for %(test_schema)s.parent;
72
73    create synonym %(test_schema)s.local_table for local_table;
74
75    -- can't make a ref from local schema to the
76    -- remote schema's table without this,
77    -- *and* cant give yourself a grant !
78    -- so we give it to public.  ideas welcome.
79    grant references on %(test_schema)s.parent to public;
80    grant references on %(test_schema)s.child to public;
81    """
82                % {"test_schema": testing.config.test_schema}
83            ).split(";"):
84                if stmt.strip():
85                    conn.exec_driver_sql(stmt)
86
87    @classmethod
88    def teardown_test_class(cls):
89        with testing.db.begin() as conn:
90            for stmt in (
91                """
92    drop table %(test_schema)s.child;
93    drop table %(test_schema)s.parent;
94    drop table local_table;
95    drop synonym %(test_schema)s.ctable;
96    drop synonym %(test_schema)s.ptable;
97    drop synonym %(test_schema)s_pt;
98    drop synonym %(test_schema)s.local_table;
99
100    """
101                % {"test_schema": testing.config.test_schema}
102            ).split(";"):
103                if stmt.strip():
104                    conn.exec_driver_sql(stmt)
105
106    def test_create_same_names_explicit_schema(self, metadata, connection):
107        schema = testing.db.dialect.default_schema_name
108        meta = metadata
109        parent = Table(
110            "parent",
111            meta,
112            Column("pid", Integer, primary_key=True),
113            schema=schema,
114        )
115        child = Table(
116            "child",
117            meta,
118            Column("cid", Integer, primary_key=True),
119            Column("pid", Integer, ForeignKey("%s.parent.pid" % schema)),
120            schema=schema,
121        )
122        meta.create_all(connection)
123        connection.execute(parent.insert(), {"pid": 1})
124        connection.execute(child.insert(), {"cid": 1, "pid": 1})
125        eq_(connection.execute(child.select()).fetchall(), [(1, 1)])
126
127    def test_reflect_alt_table_owner_local_synonym(self):
128        meta = MetaData()
129        parent = Table(
130            "%s_pt" % testing.config.test_schema,
131            meta,
132            autoload_with=testing.db,
133            oracle_resolve_synonyms=True,
134        )
135        self.assert_compile(
136            parent.select(),
137            "SELECT %(test_schema)s_pt.id, "
138            "%(test_schema)s_pt.data FROM %(test_schema)s_pt"
139            % {"test_schema": testing.config.test_schema},
140        )
141
142    def test_reflect_alt_synonym_owner_local_table(self):
143        meta = MetaData()
144        parent = Table(
145            "local_table",
146            meta,
147            autoload_with=testing.db,
148            oracle_resolve_synonyms=True,
149            schema=testing.config.test_schema,
150        )
151        self.assert_compile(
152            parent.select(),
153            "SELECT %(test_schema)s.local_table.id, "
154            "%(test_schema)s.local_table.data "
155            "FROM %(test_schema)s.local_table"
156            % {"test_schema": testing.config.test_schema},
157        )
158
159    def test_create_same_names_implicit_schema(self, metadata, connection):
160        meta = metadata
161        parent = Table(
162            "parent", meta, Column("pid", Integer, primary_key=True)
163        )
164        child = Table(
165            "child",
166            meta,
167            Column("cid", Integer, primary_key=True),
168            Column("pid", Integer, ForeignKey("parent.pid")),
169        )
170        meta.create_all(connection)
171
172        connection.execute(parent.insert(), {"pid": 1})
173        connection.execute(child.insert(), {"cid": 1, "pid": 1})
174        eq_(connection.execute(child.select()).fetchall(), [(1, 1)])
175
176    def test_reflect_alt_owner_explicit(self):
177        meta = MetaData()
178        parent = Table(
179            "parent",
180            meta,
181            autoload_with=testing.db,
182            schema=testing.config.test_schema,
183        )
184        child = Table(
185            "child",
186            meta,
187            autoload_with=testing.db,
188            schema=testing.config.test_schema,
189        )
190
191        self.assert_compile(
192            parent.join(child),
193            "%(test_schema)s.parent JOIN %(test_schema)s.child ON "
194            "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id"
195            % {"test_schema": testing.config.test_schema},
196        )
197        with testing.db.connect() as conn:
198            conn.execute(
199                select(parent, child).select_from(parent.join(child))
200            ).fetchall()
201
202        # check table comment (#5146)
203        eq_(parent.comment, "my table comment")
204
205    def test_reflect_table_comment(self, metadata, connection):
206        local_parent = Table(
207            "parent",
208            metadata,
209            Column("q", Integer),
210            comment="my local comment",
211        )
212
213        local_parent.create(connection)
214
215        insp = inspect(connection)
216        eq_(
217            insp.get_table_comment(
218                "parent", schema=testing.config.test_schema
219            ),
220            {"text": "my table comment"},
221        )
222        eq_(
223            insp.get_table_comment(
224                "parent",
225            ),
226            {"text": "my local comment"},
227        )
228        eq_(
229            insp.get_table_comment(
230                "parent", schema=connection.dialect.default_schema_name
231            ),
232            {"text": "my local comment"},
233        )
234
235    def test_reflect_local_to_remote(self, connection):
236        connection.exec_driver_sql(
237            "CREATE TABLE localtable (id INTEGER "
238            "PRIMARY KEY, parent_id INTEGER REFERENCES "
239            "%(test_schema)s.parent(id))"
240            % {"test_schema": testing.config.test_schema},
241        )
242        try:
243            meta = MetaData()
244            lcl = Table("localtable", meta, autoload_with=testing.db)
245            parent = meta.tables["%s.parent" % testing.config.test_schema]
246            self.assert_compile(
247                parent.join(lcl),
248                "%(test_schema)s.parent JOIN localtable ON "
249                "%(test_schema)s.parent.id = "
250                "localtable.parent_id"
251                % {"test_schema": testing.config.test_schema},
252            )
253        finally:
254            connection.exec_driver_sql("DROP TABLE localtable")
255
256    def test_reflect_alt_owner_implicit(self):
257        meta = MetaData()
258        parent = Table(
259            "parent",
260            meta,
261            autoload_with=testing.db,
262            schema=testing.config.test_schema,
263        )
264        child = Table(
265            "child",
266            meta,
267            autoload_with=testing.db,
268            schema=testing.config.test_schema,
269        )
270        self.assert_compile(
271            parent.join(child),
272            "%(test_schema)s.parent JOIN %(test_schema)s.child "
273            "ON %(test_schema)s.parent.id = "
274            "%(test_schema)s.child.parent_id"
275            % {"test_schema": testing.config.test_schema},
276        )
277        with testing.db.connect() as conn:
278            conn.execute(
279                select(parent, child).select_from(parent.join(child))
280            ).fetchall()
281
282    def test_reflect_alt_owner_synonyms(self, connection):
283        connection.exec_driver_sql(
284            "CREATE TABLE localtable (id INTEGER "
285            "PRIMARY KEY, parent_id INTEGER REFERENCES "
286            "%s.ptable(id))" % testing.config.test_schema,
287        )
288        try:
289            meta = MetaData()
290            lcl = Table(
291                "localtable",
292                meta,
293                autoload_with=connection,
294                oracle_resolve_synonyms=True,
295            )
296            parent = meta.tables["%s.ptable" % testing.config.test_schema]
297            self.assert_compile(
298                parent.join(lcl),
299                "%(test_schema)s.ptable JOIN localtable ON "
300                "%(test_schema)s.ptable.id = "
301                "localtable.parent_id"
302                % {"test_schema": testing.config.test_schema},
303            )
304            connection.execute(
305                select(parent, lcl).select_from(parent.join(lcl))
306            ).fetchall()
307        finally:
308            connection.exec_driver_sql("DROP TABLE localtable")
309
310    def test_reflect_remote_synonyms(self):
311        meta = MetaData()
312        parent = Table(
313            "ptable",
314            meta,
315            autoload_with=testing.db,
316            schema=testing.config.test_schema,
317            oracle_resolve_synonyms=True,
318        )
319        child = Table(
320            "ctable",
321            meta,
322            autoload_with=testing.db,
323            schema=testing.config.test_schema,
324            oracle_resolve_synonyms=True,
325        )
326        self.assert_compile(
327            parent.join(child),
328            "%(test_schema)s.ptable JOIN "
329            "%(test_schema)s.ctable "
330            "ON %(test_schema)s.ptable.id = "
331            "%(test_schema)s.ctable.parent_id"
332            % {"test_schema": testing.config.test_schema},
333        )
334
335
336class ConstraintTest(fixtures.TablesTest):
337
338    __only_on__ = "oracle"
339    __backend__ = True
340    run_deletes = None
341
342    @classmethod
343    def define_tables(cls, metadata):
344        Table("foo", metadata, Column("id", Integer, primary_key=True))
345
346    def test_oracle_has_no_on_update_cascade(self, connection):
347        bar = Table(
348            "bar",
349            self.tables_test_metadata,
350            Column("id", Integer, primary_key=True),
351            Column(
352                "foo_id", Integer, ForeignKey("foo.id", onupdate="CASCADE")
353            ),
354        )
355        assert_raises(exc.SAWarning, bar.create, connection)
356
357        bat = Table(
358            "bat",
359            self.tables_test_metadata,
360            Column("id", Integer, primary_key=True),
361            Column("foo_id", Integer),
362            ForeignKeyConstraint(["foo_id"], ["foo.id"], onupdate="CASCADE"),
363        )
364        assert_raises(exc.SAWarning, bat.create, connection)
365
366    def test_reflect_check_include_all(self, connection):
367        insp = inspect(connection)
368        eq_(insp.get_check_constraints("foo"), [])
369        eq_(
370            [
371                rec["sqltext"]
372                for rec in insp.get_check_constraints("foo", include_all=True)
373            ],
374            ['"ID" IS NOT NULL'],
375        )
376
377
378class SystemTableTablenamesTest(fixtures.TestBase):
379    __only_on__ = "oracle"
380    __backend__ = True
381
382    def setup_test(self):
383        with testing.db.begin() as conn:
384            conn.exec_driver_sql("create table my_table (id integer)")
385            conn.exec_driver_sql(
386                "create global temporary table my_temp_table (id integer)",
387            )
388            conn.exec_driver_sql(
389                "create table foo_table (id integer) tablespace SYSTEM"
390            )
391
392    def teardown_test(self):
393        with testing.db.begin() as conn:
394            conn.exec_driver_sql("drop table my_temp_table")
395            conn.exec_driver_sql("drop table my_table")
396            conn.exec_driver_sql("drop table foo_table")
397
398    def test_table_names_no_system(self):
399        insp = inspect(testing.db)
400        eq_(insp.get_table_names(), ["my_table"])
401
402    def test_temp_table_names_no_system(self):
403        insp = inspect(testing.db)
404        eq_(insp.get_temp_table_names(), ["my_temp_table"])
405
406    def test_table_names_w_system(self):
407        engine = testing_engine(options={"exclude_tablespaces": ["FOO"]})
408        insp = inspect(engine)
409        eq_(
410            set(insp.get_table_names()).intersection(
411                ["my_table", "foo_table"]
412            ),
413            set(["my_table", "foo_table"]),
414        )
415
416
417class DontReflectIOTTest(fixtures.TestBase):
418    """test that index overflow tables aren't included in
419    table_names."""
420
421    __only_on__ = "oracle"
422    __backend__ = True
423
424    def setup_test(self):
425        with testing.db.begin() as conn:
426            conn.exec_driver_sql(
427                """
428            CREATE TABLE admin_docindex(
429                    token char(20),
430                    doc_id NUMBER,
431                    token_frequency NUMBER,
432                    token_offsets VARCHAR2(2000),
433                    CONSTRAINT pk_admin_docindex PRIMARY KEY (token, doc_id))
434                ORGANIZATION INDEX
435                TABLESPACE users
436                PCTTHRESHOLD 20
437                OVERFLOW TABLESPACE users
438            """,
439            )
440
441    def teardown_test(self):
442        with testing.db.begin() as conn:
443            conn.exec_driver_sql("drop table admin_docindex")
444
445    def test_reflect_all(self, connection):
446        m = MetaData()
447        m.reflect(connection)
448        eq_(set(t.name for t in m.tables.values()), set(["admin_docindex"]))
449
450
451def all_tables_compression_missing():
452    with testing.db.connect() as conn:
453        if (
454            "Enterprise Edition"
455            not in conn.exec_driver_sql("select * from v$version").scalar()
456            # this works in Oracle Database 18c Express Edition Release
457        ) and testing.db.dialect.server_version_info < (18,):
458            return True
459        return False
460
461
462def all_tables_compress_for_missing():
463    with testing.db.connect() as conn:
464        if (
465            "Enterprise Edition"
466            not in conn.exec_driver_sql("select * from v$version").scalar()
467        ):
468            return True
469        return False
470
471
472class TableReflectionTest(fixtures.TestBase):
473    __only_on__ = "oracle"
474    __backend__ = True
475
476    @testing.fails_if(all_tables_compression_missing)
477    def test_reflect_basic_compression(self, metadata, connection):
478
479        tbl = Table(
480            "test_compress",
481            metadata,
482            Column("data", Integer, primary_key=True),
483            oracle_compress=True,
484        )
485        metadata.create_all(connection)
486
487        m2 = MetaData()
488
489        tbl = Table("test_compress", m2, autoload_with=connection)
490        # Don't hardcode the exact value, but it must be non-empty
491        assert tbl.dialect_options["oracle"]["compress"]
492
493    @testing.fails_if(all_tables_compress_for_missing)
494    def test_reflect_oltp_compression(self, metadata, connection):
495        tbl = Table(
496            "test_compress",
497            metadata,
498            Column("data", Integer, primary_key=True),
499            oracle_compress="OLTP",
500        )
501        metadata.create_all(connection)
502
503        m2 = MetaData()
504
505        tbl = Table("test_compress", m2, autoload_with=connection)
506        assert tbl.dialect_options["oracle"]["compress"] == "OLTP"
507
508
509class RoundTripIndexTest(fixtures.TestBase):
510    __only_on__ = "oracle"
511    __backend__ = True
512
513    def test_no_pk(self, metadata, connection):
514        Table(
515            "sometable",
516            metadata,
517            Column("id_a", Unicode(255)),
518            Column("id_b", Unicode(255)),
519            Index("pk_idx_1", "id_a", "id_b", unique=True),
520            Index("pk_idx_2", "id_b", "id_a", unique=True),
521        )
522        metadata.create_all(connection)
523
524        insp = inspect(connection)
525        eq_(
526            insp.get_indexes("sometable"),
527            [
528                {
529                    "name": "pk_idx_1",
530                    "column_names": ["id_a", "id_b"],
531                    "dialect_options": {},
532                    "unique": True,
533                },
534                {
535                    "name": "pk_idx_2",
536                    "column_names": ["id_b", "id_a"],
537                    "dialect_options": {},
538                    "unique": True,
539                },
540            ],
541        )
542
543    @testing.combinations((True,), (False,), argnames="explicit_pk")
544    def test_include_indexes_resembling_pk(
545        self, metadata, connection, explicit_pk
546    ):
547
548        t = Table(
549            "sometable",
550            metadata,
551            Column("id_a", Unicode(255), primary_key=True),
552            Column("id_b", Unicode(255), primary_key=True),
553            Column("group", Unicode(255), primary_key=True),
554            Column("col", Unicode(255)),
555            # Oracle won't let you do this unless the indexes have
556            # the columns in different order
557            Index("pk_idx_1", "id_b", "id_a", "group", unique=True),
558            Index("pk_idx_2", "id_b", "group", "id_a", unique=True),
559        )
560        if explicit_pk:
561            t.append_constraint(
562                PrimaryKeyConstraint(
563                    "id_a", "id_b", "group", name="some_primary_key"
564                )
565            )
566        metadata.create_all(connection)
567
568        insp = inspect(connection)
569        eq_(
570            insp.get_indexes("sometable"),
571            [
572                {
573                    "name": "pk_idx_1",
574                    "column_names": ["id_b", "id_a", "group"],
575                    "dialect_options": {},
576                    "unique": True,
577                },
578                {
579                    "name": "pk_idx_2",
580                    "column_names": ["id_b", "group", "id_a"],
581                    "dialect_options": {},
582                    "unique": True,
583                },
584            ],
585        )
586
587    def test_reflect_fn_index(self, metadata, connection):
588        """test reflection of a functional index.
589
590        it appears this emitted a warning at some point but does not right now.
591        the returned data is not exactly correct, but this is what it's
592        likely been doing for many years.
593
594        """
595
596        s_table = Table(
597            "sometable",
598            metadata,
599            Column("group", Unicode(255), primary_key=True),
600            Column("col", Unicode(255)),
601        )
602
603        Index("data_idx", func.upper(s_table.c.col))
604
605        metadata.create_all(connection)
606
607        eq_(
608            inspect(connection).get_indexes("sometable"),
609            [
610                {
611                    "column_names": [],
612                    "dialect_options": {},
613                    "name": "data_idx",
614                    "unique": False,
615                }
616            ],
617        )
618
619    def test_basic(self, metadata, connection):
620
621        s_table = Table(
622            "sometable",
623            metadata,
624            Column("id_a", Unicode(255), primary_key=True),
625            Column("id_b", Unicode(255), primary_key=True, unique=True),
626            Column("group", Unicode(255), primary_key=True),
627            Column("col", Unicode(255)),
628            UniqueConstraint("col", "group"),
629        )
630
631        # "group" is a keyword, so lower case
632        normalind = Index("tableind", s_table.c.id_b, s_table.c.group)
633        Index(
634            "compress1", s_table.c.id_a, s_table.c.id_b, oracle_compress=True
635        )
636        Index(
637            "compress2",
638            s_table.c.id_a,
639            s_table.c.id_b,
640            s_table.c.col,
641            oracle_compress=1,
642        )
643
644        metadata.create_all(connection)
645
646        mirror = MetaData()
647        mirror.reflect(connection)
648
649        metadata.drop_all(connection)
650        mirror.create_all(connection)
651
652        inspect = MetaData()
653        inspect.reflect(connection)
654
655        def obj_definition(obj):
656            return (
657                obj.__class__,
658                tuple([c.name for c in obj.columns]),
659                getattr(obj, "unique", None),
660            )
661
662        # find what the primary k constraint name should be
663        primaryconsname = connection.scalar(
664            text(
665                """SELECT constraint_name
666               FROM all_constraints
667               WHERE table_name = :table_name
668               AND owner = :owner
669               AND constraint_type = 'P' """
670            ),
671            dict(
672                table_name=s_table.name.upper(),
673                owner=testing.db.dialect.default_schema_name.upper(),
674            ),
675        )
676
677        reflectedtable = inspect.tables[s_table.name]
678
679        # make a dictionary of the reflected objects:
680
681        reflected = dict(
682            [
683                (obj_definition(i), i)
684                for i in reflectedtable.indexes | reflectedtable.constraints
685            ]
686        )
687
688        # assert we got primary key constraint and its name, Error
689        # if not in dict
690
691        assert (
692            reflected[
693                (PrimaryKeyConstraint, ("id_a", "id_b", "group"), None)
694            ].name.upper()
695            == primaryconsname.upper()
696        )
697
698        # Error if not in dict
699
700        eq_(reflected[(Index, ("id_b", "group"), False)].name, normalind.name)
701        assert (Index, ("id_b",), True) in reflected
702        assert (Index, ("col", "group"), True) in reflected
703
704        idx = reflected[(Index, ("id_a", "id_b"), False)]
705        assert idx.dialect_options["oracle"]["compress"] == 2
706
707        idx = reflected[(Index, ("id_a", "id_b", "col"), False)]
708        assert idx.dialect_options["oracle"]["compress"] == 1
709
710        eq_(len(reflectedtable.constraints), 1)
711        eq_(len(reflectedtable.indexes), 5)
712
713
714class DBLinkReflectionTest(fixtures.TestBase):
715    __requires__ = ("oracle_test_dblink",)
716    __only_on__ = "oracle"
717    __backend__ = True
718
719    @classmethod
720    def setup_test_class(cls):
721        from sqlalchemy.testing import config
722
723        cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link")
724
725        # note that the synonym here is still not totally functional
726        # when accessing via a different username as we do with the
727        # multiprocess test suite, so testing here is minimal
728        with testing.db.begin() as conn:
729            conn.exec_driver_sql(
730                "create table test_table "
731                "(id integer primary key, data varchar2(50))"
732            )
733            conn.exec_driver_sql(
734                "create synonym test_table_syn "
735                "for test_table@%s" % cls.dblink
736            )
737
738    @classmethod
739    def teardown_test_class(cls):
740        with testing.db.begin() as conn:
741            conn.exec_driver_sql("drop synonym test_table_syn")
742            conn.exec_driver_sql("drop table test_table")
743
744    def test_reflection(self):
745        """test the resolution of the synonym/dblink."""
746        m = MetaData()
747
748        t = Table(
749            "test_table_syn",
750            m,
751            autoload_with=testing.db,
752            oracle_resolve_synonyms=True,
753        )
754        eq_(list(t.c.keys()), ["id", "data"])
755        eq_(list(t.primary_key), [t.c.id])
756
757
758class TypeReflectionTest(fixtures.TestBase):
759    __only_on__ = "oracle"
760    __backend__ = True
761
762    def _run_test(self, metadata, connection, specs, attributes):
763        columns = [Column("c%i" % (i + 1), t[0]) for i, t in enumerate(specs)]
764        m = metadata
765        Table("oracle_types", m, *columns)
766        m.create_all(connection)
767        m2 = MetaData()
768        table = Table("oracle_types", m2, autoload_with=connection)
769        for i, (reflected_col, spec) in enumerate(zip(table.c, specs)):
770            expected_spec = spec[1]
771            reflected_type = reflected_col.type
772            is_(type(reflected_type), type(expected_spec))
773            for attr in attributes:
774                eq_(
775                    getattr(reflected_type, attr),
776                    getattr(expected_spec, attr),
777                    "Column %s: Attribute %s value of %s does not "
778                    "match %s for type %s"
779                    % (
780                        "c%i" % (i + 1),
781                        attr,
782                        getattr(reflected_type, attr),
783                        getattr(expected_spec, attr),
784                        spec[0],
785                    ),
786                )
787
788    def test_integer_types(self, metadata, connection):
789        specs = [(Integer, INTEGER()), (Numeric, INTEGER())]
790        self._run_test(metadata, connection, specs, [])
791
792    def test_number_types(
793        self,
794        metadata,
795        connection,
796    ):
797        specs = [(Numeric(5, 2), NUMBER(5, 2)), (NUMBER, NUMBER())]
798        self._run_test(metadata, connection, specs, ["precision", "scale"])
799
800    def test_float_types(
801        self,
802        metadata,
803        connection,
804    ):
805        specs = [
806            (DOUBLE_PRECISION(), FLOAT()),
807            # when binary_precision is supported
808            # (DOUBLE_PRECISION(), oracle.FLOAT(binary_precision=126)),
809            (BINARY_DOUBLE(), BINARY_DOUBLE()),
810            (BINARY_FLOAT(), BINARY_FLOAT()),
811            (FLOAT(5), FLOAT()),
812            # when binary_precision is supported
813            # (FLOAT(5), oracle.FLOAT(binary_precision=5),),
814            (FLOAT(), FLOAT()),
815            # when binary_precision is supported
816            # (FLOAT(5), oracle.FLOAT(binary_precision=126),),
817        ]
818        self._run_test(metadata, connection, specs, ["precision"])
819
820
821class IdentityReflectionTest(fixtures.TablesTest):
822    __only_on__ = "oracle"
823    __backend__ = True
824    __requires__ = ("identity_columns",)
825
826    @classmethod
827    def define_tables(cls, metadata):
828        Table("t1", metadata, Column("id1", Integer, Identity(on_null=True)))
829        Table("t2", metadata, Column("id2", Integer, Identity(order=True)))
830
831    def test_reflect_identity(self):
832        insp = inspect(testing.db)
833        common = {
834            "always": False,
835            "start": 1,
836            "increment": 1,
837            "on_null": False,
838            "maxvalue": 10 ** 28 - 1,
839            "minvalue": 1,
840            "cycle": False,
841            "cache": 20,
842            "order": False,
843        }
844        for col in insp.get_columns("t1") + insp.get_columns("t2"):
845            if col["name"] == "id1":
846                is_true("identity" in col)
847                exp = common.copy()
848                exp["on_null"] = True
849                eq_(col["identity"], exp)
850            if col["name"] == "id2":
851                is_true("identity" in col)
852                exp = common.copy()
853                exp["order"] = True
854                eq_(col["identity"], exp)
855