1# oracle/base.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8r"""
9.. dialect:: oracle
10    :name: Oracle
11    :full_support: 11.2, 18c
12    :normal_support: 11+
13    :best_effort: 8+
14
15
16Auto Increment Behavior
17-----------------------
18
19SQLAlchemy Table objects which include integer primary keys are usually
20assumed to have "autoincrementing" behavior, meaning they can generate their
21own primary key values upon INSERT. For use within Oracle, two options are
22available, which are the use of IDENTITY columns (Oracle 12 and above only)
23or the association of a SEQUENCE with the column.
24
25Specifying GENERATED AS IDENTITY (Oracle 12 and above)
26~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
27
28Starting from version 12 Oracle can make use of identity columns using
29the :class:`_sql.Identity` to specify the autoincrementing behavior::
30
31    t = Table('mytable', metadata,
32        Column('id', Integer, Identity(start=3), primary_key=True),
33        Column(...), ...
34    )
35
36The CREATE TABLE for the above :class:`_schema.Table` object would be:
37
38.. sourcecode:: sql
39
40    CREATE TABLE mytable (
41        id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
42        ...,
43        PRIMARY KEY (id)
44    )
45
46The :class:`_schema.Identity` object support many options to control the
47"autoincrementing" behavior of the column, like the starting value, the
48incrementing value, etc.
49In addition to the standard options, Oracle supports setting
50:paramref:`_schema.Identity.always` to ``None`` to use the default
51generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
52setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
53in conjunction with a 'BY DEFAULT' identity column.
54
55Using a SEQUENCE (all Oracle versions)
56~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
57
58Older version of Oracle had no "autoincrement"
59feature, SQLAlchemy relies upon sequences to produce these values.   With the
60older Oracle versions, *a sequence must always be explicitly specified to
61enable autoincrement*.  This is divergent with the majority of documentation
62examples which assume the usage of an autoincrement-capable database.   To
63specify sequences, use the sqlalchemy.schema.Sequence object which is passed
64to a Column construct::
65
66  t = Table('mytable', metadata,
67        Column('id', Integer, Sequence('id_seq'), primary_key=True),
68        Column(...), ...
69  )
70
71This step is also required when using table reflection, i.e. autoload_with=engine::
72
73  t = Table('mytable', metadata,
74        Column('id', Integer, Sequence('id_seq'), primary_key=True),
75        autoload_with=engine
76  )
77
78.. versionchanged::  1.4   Added :class:`_schema.Identity` construct
79   in a :class:`_schema.Column` to specify the option of an autoincrementing
80   column.
81
82.. _oracle_isolation_level:
83
84Transaction Isolation Level / Autocommit
85----------------------------------------
86
87The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of
88isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle
89dialect.
90
91To set using per-connection execution options::
92
93    connection = engine.connect()
94    connection = connection.execution_options(
95        isolation_level="AUTOCOMMIT"
96    )
97
98For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the
99level at the session level using ``ALTER SESSION``, which is reverted back
100to its default setting when the connection is returned to the connection
101pool.
102
103Valid values for ``isolation_level`` include:
104
105* ``READ COMMITTED``
106* ``AUTOCOMMIT``
107* ``SERIALIZABLE``
108
109.. note:: The implementation for the
110   :meth:`_engine.Connection.get_isolation_level` method as implemented by the
111   Oracle dialect necessarily forces the start of a transaction using the
112   Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally
113   readable.
114
115   Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
116   raise an exception if the ``v$transaction`` view is not available due to
117   permissions or other reasons, which is a common occurrence in Oracle
118   installations.
119
120   The cx_Oracle dialect attempts to call the
121   :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
122   its first connection to the database in order to acquire the
123   "default"isolation level.  This default level is necessary so that the level
124   can be reset on a connection after it has been temporarily modified using
125   :meth:`_engine.Connection.execution_options` method.   In the common event
126   that the :meth:`_engine.Connection.get_isolation_level` method raises an
127   exception due to ``v$transaction`` not being readable as well as any other
128   database-related failure, the level is assumed to be "READ COMMITTED".  No
129   warning is emitted for this initial first-connect condition as it is
130   expected to be a common restriction on Oracle databases.
131
132.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect
133   as well as the notion of a default isolation level
134
135.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
136   reading of the isolation level.
137
138.. versionchanged:: 1.3.22 In the event that the default isolation
139   level cannot be read due to permissions on the v$transaction view as
140   is common in Oracle installations, the default isolation level is hardcoded
141   to "READ COMMITTED" which was the behavior prior to 1.3.21.
142
143.. seealso::
144
145    :ref:`dbapi_autocommit`
146
147Identifier Casing
148-----------------
149
150In Oracle, the data dictionary represents all case insensitive identifier
151names using UPPERCASE text.   SQLAlchemy on the other hand considers an
152all-lower case identifier name to be case insensitive.   The Oracle dialect
153converts all case insensitive identifiers to and from those two formats during
154schema level communication, such as reflection of tables and indexes.   Using
155an UPPERCASE name on the SQLAlchemy side indicates a case sensitive
156identifier, and SQLAlchemy will quote the name - this will cause mismatches
157against data dictionary data received from Oracle, so unless identifier names
158have been truly created as case sensitive (i.e. using quoted names), all
159lowercase names should be used on the SQLAlchemy side.
160
161.. _oracle_max_identifier_lengths:
162
163Max Identifier Lengths
164----------------------
165
166Oracle has changed the default max identifier length as of Oracle Server
167version 12.2.   Prior to this version, the length was 30, and for 12.2 and
168greater it is now 128.   This change impacts SQLAlchemy in the area of
169generated SQL label names as well as the generation of constraint names,
170particularly in the case where the constraint naming convention feature
171described at :ref:`constraint_naming_conventions` is being used.
172
173To assist with this change and others, Oracle includes the concept of a
174"compatibility" version, which is a version number that is independent of the
175actual server version in order to assist with migration of Oracle databases,
176and may be configured within the Oracle server itself. This compatibility
177version is retrieved using the query  ``SELECT value FROM v$parameter WHERE
178name = 'compatible';``.   The SQLAlchemy Oracle dialect, when tasked with
179determining the default max identifier length, will attempt to use this query
180upon first connect in order to determine the effective compatibility version of
181the server, which determines what the maximum allowed identifier length is for
182the server.  If the table is not available, the  server version information is
183used instead.
184
185As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect
186is 128 characters.  Upon first connect, the compatibility version is detected
187and if it is less than Oracle version 12.2, the max identifier length is
188changed to be 30 characters.  In all cases, setting the
189:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
190change and the value given will be used as is::
191
192    engine = create_engine(
193        "oracle+cx_oracle://scott:tiger@oracle122",
194        max_identifier_length=30)
195
196The maximum identifier length comes into play both when generating anonymized
197SQL labels in SELECT statements, but more crucially when generating constraint
198names from a naming convention.  It is this area that has created the need for
199SQLAlchemy to change this default conservatively.   For example, the following
200naming convention produces two very different constraint names based on the
201identifier length::
202
203    from sqlalchemy import Column
204    from sqlalchemy import Index
205    from sqlalchemy import Integer
206    from sqlalchemy import MetaData
207    from sqlalchemy import Table
208    from sqlalchemy.dialects import oracle
209    from sqlalchemy.schema import CreateIndex
210
211    m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
212
213    t = Table(
214        "t",
215        m,
216        Column("some_column_name_1", Integer),
217        Column("some_column_name_2", Integer),
218        Column("some_column_name_3", Integer),
219    )
220
221    ix = Index(
222        None,
223        t.c.some_column_name_1,
224        t.c.some_column_name_2,
225        t.c.some_column_name_3,
226    )
227
228    oracle_dialect = oracle.dialect(max_identifier_length=30)
229    print(CreateIndex(ix).compile(dialect=oracle_dialect))
230
231With an identifier length of 30, the above CREATE INDEX looks like::
232
233    CREATE INDEX ix_some_column_name_1s_70cd ON t
234    (some_column_name_1, some_column_name_2, some_column_name_3)
235
236However with length=128, it becomes::
237
238    CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
239    (some_column_name_1, some_column_name_2, some_column_name_3)
240
241Applications which have run versions of SQLAlchemy prior to 1.4 on an  Oracle
242server version 12.2 or greater are therefore subject to the scenario of a
243database migration that wishes to "DROP CONSTRAINT" on a name that was
244previously generated with the shorter length.  This migration will fail when
245the identifier length is changed without the name of the index or constraint
246first being adjusted.  Such applications are strongly advised to make use of
247:paramref:`_sa.create_engine.max_identifier_length`
248in order to maintain control
249of the generation of truncated names, and to fully review and test all database
250migrations in a staging environment when changing this value to ensure that the
251impact of this change has been mitigated.
252
253.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128
254   characters, which is adjusted down to 30 upon first connect if an older
255   version of Oracle server (compatibility version < 12.2) is detected.
256
257
258LIMIT/OFFSET Support
259--------------------
260
261Oracle has no direct support for LIMIT and OFFSET until version 12c.
262To achieve this behavior across all widely used versions of Oracle starting
263with the 8 series, SQLAlchemy currently makes use of ROWNUM to achieve
264LIMIT/OFFSET; the exact methodology is taken from
265https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
266
267There is currently a single option to affect its behavior:
268
269* the "FIRST_ROWS()" optimization keyword is not used by default.  To enable
270  the usage of this optimization directive, specify ``optimize_limits=True``
271  to :func:`_sa.create_engine`.
272
273.. versionchanged:: 1.4
274    The Oracle dialect renders limit/offset integer values using a "post
275    compile" scheme which renders the integer directly before passing the
276    statement to the cursor for execution.   The ``use_binds_for_limits`` flag
277    no longer has an effect.
278
279    .. seealso::
280
281        :ref:`change_4808`.
282
283Support for changing the row number strategy, which would include one that
284makes use of the ``row_number()`` window function as well as one that makes
285use of the Oracle 12c  "FETCH FIRST N ROW / OFFSET N ROWS" keywords may be
286added in a future release.
287
288
289.. _oracle_returning:
290
291RETURNING Support
292-----------------
293
294The Oracle database supports a limited form of RETURNING, in order to retrieve
295result sets of matched rows from INSERT, UPDATE and DELETE statements.
296Oracle's RETURNING..INTO syntax only supports one row being returned, as it
297relies upon OUT parameters in order to function.  In addition, supported
298DBAPIs have further limitations (see :ref:`cx_oracle_returning`).
299
300SQLAlchemy's "implicit returning" feature, which employs RETURNING within an
301INSERT and sometimes an UPDATE statement in order to fetch newly generated
302primary key values and other SQL defaults and expressions, is normally enabled
303on the Oracle backend.  By default, "implicit returning" typically only
304fetches the value of a single ``nextval(some_seq)`` expression embedded into
305an INSERT in order to increment a sequence within an INSERT statement and get
306the value back at the same time. To disable this feature across the board,
307specify ``implicit_returning=False`` to :func:`_sa.create_engine`::
308
309    engine = create_engine("oracle://scott:tiger@dsn",
310                           implicit_returning=False)
311
312Implicit returning can also be disabled on a table-by-table basis as a table
313option::
314
315    # Core Table
316    my_table = Table("my_table", metadata, ..., implicit_returning=False)
317
318
319    # declarative
320    class MyClass(Base):
321        __tablename__ = 'my_table'
322        __table_args__ = {"implicit_returning": False}
323
324.. seealso::
325
326    :ref:`cx_oracle_returning` - additional cx_oracle-specific restrictions on
327    implicit returning.
328
329ON UPDATE CASCADE
330-----------------
331
332Oracle doesn't have native ON UPDATE CASCADE functionality.  A trigger based
333solution is available at
334https://asktom.oracle.com/tkyte/update_cascade/index.html .
335
336When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
337cascading updates - specify ForeignKey objects using the
338"deferrable=True, initially='deferred'" keyword arguments,
339and specify "passive_updates=False" on each relationship().
340
341Oracle 8 Compatibility
342----------------------
343
344When Oracle 8 is detected, the dialect internally configures itself to the
345following behaviors:
346
347* the use_ansi flag is set to False.  This has the effect of converting all
348  JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
349  makes use of Oracle's (+) operator.
350
351* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
352  the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are
353  issued instead.   This because these types don't seem to work correctly on
354  Oracle 8 even though they are available.  The
355  :class:`~sqlalchemy.types.NVARCHAR` and
356  :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
357  NVARCHAR2 and NCLOB.
358
359* the "native unicode" mode is disabled when using cx_oracle, i.e. SQLAlchemy
360  encodes all Python unicode objects to "string" before passing in as bind
361  parameters.
362
363Synonym/DBLINK Reflection
364-------------------------
365
366When using reflection with Table objects, the dialect can optionally search
367for tables indicated by synonyms, either in local or remote schemas or
368accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
369a keyword argument to the :class:`_schema.Table` construct::
370
371    some_table = Table('some_table', autoload_with=some_engine,
372                                oracle_resolve_synonyms=True)
373
374When this flag is set, the given name (such as ``some_table`` above) will
375be searched not just in the ``ALL_TABLES`` view, but also within the
376``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
377name.  If the synonym is located and refers to a DBLINK, the oracle dialect
378knows how to locate the table's information using DBLINK syntax(e.g.
379``@dblink``).
380
381``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
382accepted, including methods such as :meth:`_schema.MetaData.reflect` and
383:meth:`_reflection.Inspector.get_columns`.
384
385If synonyms are not in use, this flag should be left disabled.
386
387.. _oracle_constraint_reflection:
388
389Constraint Reflection
390---------------------
391
392The Oracle dialect can return information about foreign key, unique, and
393CHECK constraints, as well as indexes on tables.
394
395Raw information regarding these constraints can be acquired using
396:meth:`_reflection.Inspector.get_foreign_keys`,
397:meth:`_reflection.Inspector.get_unique_constraints`,
398:meth:`_reflection.Inspector.get_check_constraints`, and
399:meth:`_reflection.Inspector.get_indexes`.
400
401.. versionchanged:: 1.2  The Oracle dialect can now reflect UNIQUE and
402   CHECK constraints.
403
404When using reflection at the :class:`_schema.Table` level, the
405:class:`_schema.Table`
406will also include these constraints.
407
408Note the following caveats:
409
410* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
411  Oracle
412  builds a special "IS NOT NULL" constraint for columns that specify
413  "NOT NULL".  This constraint is **not** returned by default; to include
414  the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
415
416      from sqlalchemy import create_engine, inspect
417
418      engine = create_engine("oracle+cx_oracle://s:t@dsn")
419      inspector = inspect(engine)
420      all_check_constraints = inspector.get_check_constraints(
421          "some_table", include_all=True)
422
423* in most cases, when reflecting a :class:`_schema.Table`,
424  a UNIQUE constraint will
425  **not** be available as a :class:`.UniqueConstraint` object, as Oracle
426  mirrors unique constraints with a UNIQUE index in most cases (the exception
427  seems to be when two or more unique constraints represent the same columns);
428  the :class:`_schema.Table` will instead represent these using
429  :class:`.Index`
430  with the ``unique=True`` flag set.
431
432* Oracle creates an implicit index for the primary key of a table; this index
433  is **excluded** from all index results.
434
435* the list of columns reflected for an index will not include column names
436  that start with SYS_NC.
437
438Table names with SYSTEM/SYSAUX tablespaces
439-------------------------------------------
440
441The :meth:`_reflection.Inspector.get_table_names` and
442:meth:`_reflection.Inspector.get_temp_table_names`
443methods each return a list of table names for the current engine. These methods
444are also part of the reflection which occurs within an operation such as
445:meth:`_schema.MetaData.reflect`.  By default,
446these operations exclude the ``SYSTEM``
447and ``SYSAUX`` tablespaces from the operation.   In order to change this, the
448default list of tablespaces excluded can be changed at the engine level using
449the ``exclude_tablespaces`` parameter::
450
451    # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
452    e = create_engine(
453      "oracle://scott:tiger@xe",
454      exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"])
455
456.. versionadded:: 1.1
457
458DateTime Compatibility
459----------------------
460
461Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``,
462which can actually store a date and time value.  For this reason, the Oracle
463dialect provides a type :class:`_oracle.DATE` which is a subclass of
464:class:`.DateTime`.   This type has no special behavior, and is only
465present as a "marker" for this type; additionally, when a database column
466is reflected and the type is reported as ``DATE``, the time-supporting
467:class:`_oracle.DATE` type is used.
468
469.. versionchanged:: 0.9.4 Added :class:`_oracle.DATE` to subclass
470   :class:`.DateTime`.  This is a change as previous versions
471   would reflect a ``DATE`` column as :class:`_types.DATE`, which subclasses
472   :class:`.Date`.   The only significance here is for schemes that are
473   examining the type of column for use in special Python translations or
474   for migrating schemas to other database backends.
475
476.. _oracle_table_options:
477
478Oracle Table Options
479-------------------------
480
481The CREATE TABLE phrase supports the following options with Oracle
482in conjunction with the :class:`_schema.Table` construct:
483
484
485* ``ON COMMIT``::
486
487    Table(
488        "some_table", metadata, ...,
489        prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS')
490
491.. versionadded:: 1.0.0
492
493* ``COMPRESS``::
494
495    Table('mytable', metadata, Column('data', String(32)),
496        oracle_compress=True)
497
498    Table('mytable', metadata, Column('data', String(32)),
499        oracle_compress=6)
500
501   The ``oracle_compress`` parameter accepts either an integer compression
502   level, or ``True`` to use the default compression level.
503
504.. versionadded:: 1.0.0
505
506.. _oracle_index_options:
507
508Oracle Specific Index Options
509-----------------------------
510
511Bitmap Indexes
512~~~~~~~~~~~~~~
513
514You can specify the ``oracle_bitmap`` parameter to create a bitmap index
515instead of a B-tree index::
516
517    Index('my_index', my_table.c.data, oracle_bitmap=True)
518
519Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
520check for such limitations, only the database will.
521
522.. versionadded:: 1.0.0
523
524Index compression
525~~~~~~~~~~~~~~~~~
526
527Oracle has a more efficient storage mode for indexes containing lots of
528repeated values. Use the ``oracle_compress`` parameter to turn on key
529compression::
530
531    Index('my_index', my_table.c.data, oracle_compress=True)
532
533    Index('my_index', my_table.c.data1, my_table.c.data2, unique=True,
534           oracle_compress=1)
535
536The ``oracle_compress`` parameter accepts either an integer specifying the
537number of prefix columns to compress, or ``True`` to use the default (all
538columns for non-unique indexes, all but the last column for unique indexes).
539
540.. versionadded:: 1.0.0
541
542"""  # noqa
543
544from itertools import groupby
545import re
546
547from ... import Computed
548from ... import exc
549from ... import schema as sa_schema
550from ... import sql
551from ... import util
552from ...engine import default
553from ...engine import reflection
554from ...sql import compiler
555from ...sql import expression
556from ...sql import sqltypes
557from ...sql import util as sql_util
558from ...sql import visitors
559from ...types import BLOB
560from ...types import CHAR
561from ...types import CLOB
562from ...types import FLOAT
563from ...types import INTEGER
564from ...types import NCHAR
565from ...types import NVARCHAR
566from ...types import TIMESTAMP
567from ...types import VARCHAR
568from ...util import compat
569
570RESERVED_WORDS = set(
571    "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
572    "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
573    "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
574    "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
575    "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
576    "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
577    "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
578    "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
579    "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
580)
581
582NO_ARG_FNS = set(
583    "UID CURRENT_DATE SYSDATE USER " "CURRENT_TIME CURRENT_TIMESTAMP".split()
584)
585
586
587class RAW(sqltypes._Binary):
588    __visit_name__ = "RAW"
589
590
591OracleRaw = RAW
592
593
594class NCLOB(sqltypes.Text):
595    __visit_name__ = "NCLOB"
596
597
598class VARCHAR2(VARCHAR):
599    __visit_name__ = "VARCHAR2"
600
601
602NVARCHAR2 = NVARCHAR
603
604
605class NUMBER(sqltypes.Numeric, sqltypes.Integer):
606    __visit_name__ = "NUMBER"
607
608    def __init__(self, precision=None, scale=None, asdecimal=None):
609        if asdecimal is None:
610            asdecimal = bool(scale and scale > 0)
611
612        super(NUMBER, self).__init__(
613            precision=precision, scale=scale, asdecimal=asdecimal
614        )
615
616    def adapt(self, impltype):
617        ret = super(NUMBER, self).adapt(impltype)
618        # leave a hint for the DBAPI handler
619        ret._is_oracle_number = True
620        return ret
621
622    @property
623    def _type_affinity(self):
624        if bool(self.scale and self.scale > 0):
625            return sqltypes.Numeric
626        else:
627            return sqltypes.Integer
628
629
630class DOUBLE_PRECISION(sqltypes.Float):
631    __visit_name__ = "DOUBLE_PRECISION"
632
633
634class BINARY_DOUBLE(sqltypes.Float):
635    __visit_name__ = "BINARY_DOUBLE"
636
637
638class BINARY_FLOAT(sqltypes.Float):
639    __visit_name__ = "BINARY_FLOAT"
640
641
642class BFILE(sqltypes.LargeBinary):
643    __visit_name__ = "BFILE"
644
645
646class LONG(sqltypes.Text):
647    __visit_name__ = "LONG"
648
649
650class DATE(sqltypes.DateTime):
651    """Provide the oracle DATE type.
652
653    This type has no special Python behavior, except that it subclasses
654    :class:`_types.DateTime`; this is to suit the fact that the Oracle
655    ``DATE`` type supports a time value.
656
657    .. versionadded:: 0.9.4
658
659    """
660
661    __visit_name__ = "DATE"
662
663    def _compare_type_affinity(self, other):
664        return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
665
666
667class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
668    __visit_name__ = "INTERVAL"
669
670    def __init__(self, day_precision=None, second_precision=None):
671        """Construct an INTERVAL.
672
673        Note that only DAY TO SECOND intervals are currently supported.
674        This is due to a lack of support for YEAR TO MONTH intervals
675        within available DBAPIs.
676
677        :param day_precision: the day precision value.  this is the number of
678          digits to store for the day field.  Defaults to "2"
679        :param second_precision: the second precision value.  this is the
680          number of digits to store for the fractional seconds field.
681          Defaults to "6".
682
683        """
684        self.day_precision = day_precision
685        self.second_precision = second_precision
686
687    @classmethod
688    def _adapt_from_generic_interval(cls, interval):
689        return INTERVAL(
690            day_precision=interval.day_precision,
691            second_precision=interval.second_precision,
692        )
693
694    @property
695    def _type_affinity(self):
696        return sqltypes.Interval
697
698    def as_generic(self, allow_nulltype=False):
699        return sqltypes.Interval(
700            native=True,
701            second_precision=self.second_precision,
702            day_precision=self.day_precision,
703        )
704
705    def coerce_compared_value(self, op, value):
706        return self
707
708
709class ROWID(sqltypes.TypeEngine):
710    """Oracle ROWID type.
711
712    When used in a cast() or similar, generates ROWID.
713
714    """
715
716    __visit_name__ = "ROWID"
717
718
719class _OracleBoolean(sqltypes.Boolean):
720    def get_dbapi_type(self, dbapi):
721        return dbapi.NUMBER
722
723
724colspecs = {
725    sqltypes.Boolean: _OracleBoolean,
726    sqltypes.Interval: INTERVAL,
727    sqltypes.DateTime: DATE,
728}
729
730ischema_names = {
731    "VARCHAR2": VARCHAR,
732    "NVARCHAR2": NVARCHAR,
733    "CHAR": CHAR,
734    "NCHAR": NCHAR,
735    "DATE": DATE,
736    "NUMBER": NUMBER,
737    "BLOB": BLOB,
738    "BFILE": BFILE,
739    "CLOB": CLOB,
740    "NCLOB": NCLOB,
741    "TIMESTAMP": TIMESTAMP,
742    "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
743    "INTERVAL DAY TO SECOND": INTERVAL,
744    "RAW": RAW,
745    "FLOAT": FLOAT,
746    "DOUBLE PRECISION": DOUBLE_PRECISION,
747    "LONG": LONG,
748    "BINARY_DOUBLE": BINARY_DOUBLE,
749    "BINARY_FLOAT": BINARY_FLOAT,
750}
751
752
753class OracleTypeCompiler(compiler.GenericTypeCompiler):
754    # Note:
755    # Oracle DATE == DATETIME
756    # Oracle does not allow milliseconds in DATE
757    # Oracle does not support TIME columns
758
759    def visit_datetime(self, type_, **kw):
760        return self.visit_DATE(type_, **kw)
761
762    def visit_float(self, type_, **kw):
763        return self.visit_FLOAT(type_, **kw)
764
765    def visit_unicode(self, type_, **kw):
766        if self.dialect._use_nchar_for_unicode:
767            return self.visit_NVARCHAR2(type_, **kw)
768        else:
769            return self.visit_VARCHAR2(type_, **kw)
770
771    def visit_INTERVAL(self, type_, **kw):
772        return "INTERVAL DAY%s TO SECOND%s" % (
773            type_.day_precision is not None
774            and "(%d)" % type_.day_precision
775            or "",
776            type_.second_precision is not None
777            and "(%d)" % type_.second_precision
778            or "",
779        )
780
781    def visit_LONG(self, type_, **kw):
782        return "LONG"
783
784    def visit_TIMESTAMP(self, type_, **kw):
785        if type_.timezone:
786            return "TIMESTAMP WITH TIME ZONE"
787        else:
788            return "TIMESTAMP"
789
790    def visit_DOUBLE_PRECISION(self, type_, **kw):
791        return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
792
793    def visit_BINARY_DOUBLE(self, type_, **kw):
794        return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
795
796    def visit_BINARY_FLOAT(self, type_, **kw):
797        return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
798
799    def visit_FLOAT(self, type_, **kw):
800        # don't support conversion between decimal/binary
801        # precision yet
802        kw["no_precision"] = True
803        return self._generate_numeric(type_, "FLOAT", **kw)
804
805    def visit_NUMBER(self, type_, **kw):
806        return self._generate_numeric(type_, "NUMBER", **kw)
807
808    def _generate_numeric(
809        self, type_, name, precision=None, scale=None, no_precision=False, **kw
810    ):
811        if precision is None:
812            precision = type_.precision
813
814        if scale is None:
815            scale = getattr(type_, "scale", None)
816
817        if no_precision or precision is None:
818            return name
819        elif scale is None:
820            n = "%(name)s(%(precision)s)"
821            return n % {"name": name, "precision": precision}
822        else:
823            n = "%(name)s(%(precision)s, %(scale)s)"
824            return n % {"name": name, "precision": precision, "scale": scale}
825
826    def visit_string(self, type_, **kw):
827        return self.visit_VARCHAR2(type_, **kw)
828
829    def visit_VARCHAR2(self, type_, **kw):
830        return self._visit_varchar(type_, "", "2")
831
832    def visit_NVARCHAR2(self, type_, **kw):
833        return self._visit_varchar(type_, "N", "2")
834
835    visit_NVARCHAR = visit_NVARCHAR2
836
837    def visit_VARCHAR(self, type_, **kw):
838        return self._visit_varchar(type_, "", "")
839
840    def _visit_varchar(self, type_, n, num):
841        if not type_.length:
842            return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
843        elif not n and self.dialect._supports_char_length:
844            varchar = "VARCHAR%(two)s(%(length)s CHAR)"
845            return varchar % {"length": type_.length, "two": num}
846        else:
847            varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
848            return varchar % {"length": type_.length, "two": num, "n": n}
849
850    def visit_text(self, type_, **kw):
851        return self.visit_CLOB(type_, **kw)
852
853    def visit_unicode_text(self, type_, **kw):
854        if self.dialect._use_nchar_for_unicode:
855            return self.visit_NCLOB(type_, **kw)
856        else:
857            return self.visit_CLOB(type_, **kw)
858
859    def visit_large_binary(self, type_, **kw):
860        return self.visit_BLOB(type_, **kw)
861
862    def visit_big_integer(self, type_, **kw):
863        return self.visit_NUMBER(type_, precision=19, **kw)
864
865    def visit_boolean(self, type_, **kw):
866        return self.visit_SMALLINT(type_, **kw)
867
868    def visit_RAW(self, type_, **kw):
869        if type_.length:
870            return "RAW(%(length)s)" % {"length": type_.length}
871        else:
872            return "RAW"
873
874    def visit_ROWID(self, type_, **kw):
875        return "ROWID"
876
877
878class OracleCompiler(compiler.SQLCompiler):
879    """Oracle compiler modifies the lexical structure of Select
880    statements to work under non-ANSI configured Oracle databases, if
881    the use_ansi flag is False.
882    """
883
884    compound_keywords = util.update_copy(
885        compiler.SQLCompiler.compound_keywords,
886        {expression.CompoundSelect.EXCEPT: "MINUS"},
887    )
888
889    def __init__(self, *args, **kwargs):
890        self.__wheres = {}
891        super(OracleCompiler, self).__init__(*args, **kwargs)
892
893    def visit_mod_binary(self, binary, operator, **kw):
894        return "mod(%s, %s)" % (
895            self.process(binary.left, **kw),
896            self.process(binary.right, **kw),
897        )
898
899    def visit_now_func(self, fn, **kw):
900        return "CURRENT_TIMESTAMP"
901
902    def visit_char_length_func(self, fn, **kw):
903        return "LENGTH" + self.function_argspec(fn, **kw)
904
905    def visit_match_op_binary(self, binary, operator, **kw):
906        return "CONTAINS (%s, %s)" % (
907            self.process(binary.left),
908            self.process(binary.right),
909        )
910
911    def visit_true(self, expr, **kw):
912        return "1"
913
914    def visit_false(self, expr, **kw):
915        return "0"
916
917    def get_cte_preamble(self, recursive):
918        return "WITH"
919
920    def get_select_hint_text(self, byfroms):
921        return " ".join("/*+ %s */" % text for table, text in byfroms.items())
922
923    def function_argspec(self, fn, **kw):
924        if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
925            return compiler.SQLCompiler.function_argspec(self, fn, **kw)
926        else:
927            return ""
928
929    def visit_function(self, func, **kw):
930        text = super(OracleCompiler, self).visit_function(func, **kw)
931        if kw.get("asfrom", False):
932            text = "TABLE (%s)" % func
933        return text
934
935    def visit_table_valued_column(self, element, **kw):
936        text = super(OracleCompiler, self).visit_table_valued_column(
937            element, **kw
938        )
939        text = "COLUMN_VALUE " + text
940        return text
941
942    def default_from(self):
943        """Called when a ``SELECT`` statement has no froms,
944        and no ``FROM`` clause is to be appended.
945
946        The Oracle compiler tacks a "FROM DUAL" to the statement.
947        """
948
949        return " FROM DUAL"
950
951    def visit_join(self, join, from_linter=None, **kwargs):
952        if self.dialect.use_ansi:
953            return compiler.SQLCompiler.visit_join(
954                self, join, from_linter=from_linter, **kwargs
955            )
956        else:
957            if from_linter:
958                from_linter.edges.add((join.left, join.right))
959
960            kwargs["asfrom"] = True
961            if isinstance(join.right, expression.FromGrouping):
962                right = join.right.element
963            else:
964                right = join.right
965            return (
966                self.process(join.left, from_linter=from_linter, **kwargs)
967                + ", "
968                + self.process(right, from_linter=from_linter, **kwargs)
969            )
970
971    def _get_nonansi_join_whereclause(self, froms):
972        clauses = []
973
974        def visit_join(join):
975            if join.isouter:
976                # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
977                # "apply the outer join operator (+) to all columns of B in
978                # the join condition in the WHERE clause" - that is,
979                # unconditionally regardless of operator or the other side
980                def visit_binary(binary):
981                    if isinstance(
982                        binary.left, expression.ColumnClause
983                    ) and join.right.is_derived_from(binary.left.table):
984                        binary.left = _OuterJoinColumn(binary.left)
985                    elif isinstance(
986                        binary.right, expression.ColumnClause
987                    ) and join.right.is_derived_from(binary.right.table):
988                        binary.right = _OuterJoinColumn(binary.right)
989
990                clauses.append(
991                    visitors.cloned_traverse(
992                        join.onclause, {}, {"binary": visit_binary}
993                    )
994                )
995            else:
996                clauses.append(join.onclause)
997
998            for j in join.left, join.right:
999                if isinstance(j, expression.Join):
1000                    visit_join(j)
1001                elif isinstance(j, expression.FromGrouping):
1002                    visit_join(j.element)
1003
1004        for f in froms:
1005            if isinstance(f, expression.Join):
1006                visit_join(f)
1007
1008        if not clauses:
1009            return None
1010        else:
1011            return sql.and_(*clauses)
1012
1013    def visit_outer_join_column(self, vc, **kw):
1014        return self.process(vc.column, **kw) + "(+)"
1015
1016    def visit_sequence(self, seq, **kw):
1017        return self.preparer.format_sequence(seq) + ".nextval"
1018
1019    def get_render_as_alias_suffix(self, alias_name_text):
1020        """Oracle doesn't like ``FROM table AS alias``"""
1021
1022        return " " + alias_name_text
1023
1024    def returning_clause(self, stmt, returning_cols):
1025        columns = []
1026        binds = []
1027
1028        for i, column in enumerate(
1029            expression._select_iterables(returning_cols)
1030        ):
1031            if (
1032                self.isupdate
1033                and isinstance(column, sa_schema.Column)
1034                and isinstance(column.server_default, Computed)
1035                and not self.dialect._supports_update_returning_computed_cols
1036            ):
1037                util.warn(
1038                    "Computed columns don't work with Oracle UPDATE "
1039                    "statements that use RETURNING; the value of the column "
1040                    "*before* the UPDATE takes place is returned.   It is "
1041                    "advised to not use RETURNING with an Oracle computed "
1042                    "column.  Consider setting implicit_returning to False on "
1043                    "the Table object in order to avoid implicit RETURNING "
1044                    "clauses from being generated for this Table."
1045                )
1046            if column.type._has_column_expression:
1047                col_expr = column.type.column_expression(column)
1048            else:
1049                col_expr = column
1050
1051            outparam = sql.outparam("ret_%d" % i, type_=column.type)
1052            self.binds[outparam.key] = outparam
1053            binds.append(
1054                self.bindparam_string(self._truncate_bindparam(outparam))
1055            )
1056
1057            # ensure the ExecutionContext.get_out_parameters() method is
1058            # *not* called; the cx_Oracle dialect wants to handle these
1059            # parameters separately
1060            self.has_out_parameters = False
1061
1062            columns.append(self.process(col_expr, within_columns_clause=False))
1063
1064            self._add_to_result_map(
1065                getattr(col_expr, "name", col_expr._anon_name_label),
1066                getattr(col_expr, "name", col_expr._anon_name_label),
1067                (
1068                    column,
1069                    getattr(column, "name", None),
1070                    getattr(column, "key", None),
1071                ),
1072                column.type,
1073            )
1074
1075        return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
1076
1077    def translate_select_structure(self, select_stmt, **kwargs):
1078        select = select_stmt
1079
1080        if not getattr(select, "_oracle_visit", None):
1081            if not self.dialect.use_ansi:
1082                froms = self._display_froms_for_select(
1083                    select, kwargs.get("asfrom", False)
1084                )
1085                whereclause = self._get_nonansi_join_whereclause(froms)
1086                if whereclause is not None:
1087                    select = select.where(whereclause)
1088                    select._oracle_visit = True
1089
1090            # if fetch is used this is not needed
1091            if (
1092                select._has_row_limiting_clause
1093                and select._fetch_clause is None
1094            ):
1095                limit_clause = select._limit_clause
1096                offset_clause = select._offset_clause
1097
1098                if select._simple_int_clause(limit_clause):
1099                    limit_clause = limit_clause.render_literal_execute()
1100
1101                if select._simple_int_clause(offset_clause):
1102                    offset_clause = offset_clause.render_literal_execute()
1103
1104                # currently using form at:
1105                # https://blogs.oracle.com/oraclemagazine/\
1106                # on-rownum-and-limiting-results
1107
1108                orig_select = select
1109                select = select._generate()
1110                select._oracle_visit = True
1111
1112                # add expressions to accommodate FOR UPDATE OF
1113                for_update = select._for_update_arg
1114                if for_update is not None and for_update.of:
1115                    for_update = for_update._clone()
1116                    for_update._copy_internals()
1117
1118                    for elem in for_update.of:
1119                        if not select.selected_columns.contains_column(elem):
1120                            select = select.add_columns(elem)
1121
1122                # Wrap the middle select and add the hint
1123                inner_subquery = select.alias()
1124                limitselect = sql.select(
1125                    *[
1126                        c
1127                        for c in inner_subquery.c
1128                        if orig_select.selected_columns.corresponding_column(c)
1129                        is not None
1130                    ]
1131                )
1132
1133                if (
1134                    limit_clause is not None
1135                    and self.dialect.optimize_limits
1136                    and select._simple_int_clause(limit_clause)
1137                ):
1138                    limitselect = limitselect.prefix_with(
1139                        expression.text(
1140                            "/*+ FIRST_ROWS(%s) */"
1141                            % self.process(limit_clause, **kwargs)
1142                        )
1143                    )
1144
1145                limitselect._oracle_visit = True
1146                limitselect._is_wrapper = True
1147
1148                # add expressions to accommodate FOR UPDATE OF
1149                if for_update is not None and for_update.of:
1150
1151                    adapter = sql_util.ClauseAdapter(inner_subquery)
1152                    for_update.of = [
1153                        adapter.traverse(elem) for elem in for_update.of
1154                    ]
1155
1156                # If needed, add the limiting clause
1157                if limit_clause is not None:
1158                    if select._simple_int_clause(limit_clause) and (
1159                        offset_clause is None
1160                        or select._simple_int_clause(offset_clause)
1161                    ):
1162                        max_row = limit_clause
1163
1164                        if offset_clause is not None:
1165                            max_row = max_row + offset_clause
1166
1167                    else:
1168                        max_row = limit_clause
1169
1170                        if offset_clause is not None:
1171                            max_row = max_row + offset_clause
1172                    limitselect = limitselect.where(
1173                        sql.literal_column("ROWNUM") <= max_row
1174                    )
1175
1176                # If needed, add the ora_rn, and wrap again with offset.
1177                if offset_clause is None:
1178                    limitselect._for_update_arg = for_update
1179                    select = limitselect
1180                else:
1181                    limitselect = limitselect.add_columns(
1182                        sql.literal_column("ROWNUM").label("ora_rn")
1183                    )
1184                    limitselect._oracle_visit = True
1185                    limitselect._is_wrapper = True
1186
1187                    if for_update is not None and for_update.of:
1188                        limitselect_cols = limitselect.selected_columns
1189                        for elem in for_update.of:
1190                            if (
1191                                limitselect_cols.corresponding_column(elem)
1192                                is None
1193                            ):
1194                                limitselect = limitselect.add_columns(elem)
1195
1196                    limit_subquery = limitselect.alias()
1197                    origselect_cols = orig_select.selected_columns
1198                    offsetselect = sql.select(
1199                        *[
1200                            c
1201                            for c in limit_subquery.c
1202                            if origselect_cols.corresponding_column(c)
1203                            is not None
1204                        ]
1205                    )
1206
1207                    offsetselect._oracle_visit = True
1208                    offsetselect._is_wrapper = True
1209
1210                    if for_update is not None and for_update.of:
1211                        adapter = sql_util.ClauseAdapter(limit_subquery)
1212                        for_update.of = [
1213                            adapter.traverse(elem) for elem in for_update.of
1214                        ]
1215
1216                    offsetselect = offsetselect.where(
1217                        sql.literal_column("ora_rn") > offset_clause
1218                    )
1219
1220                    offsetselect._for_update_arg = for_update
1221                    select = offsetselect
1222
1223        return select
1224
1225    def limit_clause(self, select, **kw):
1226        return ""
1227
1228    def visit_empty_set_expr(self, type_):
1229        return "SELECT 1 FROM DUAL WHERE 1!=1"
1230
1231    def for_update_clause(self, select, **kw):
1232        if self.is_subquery():
1233            return ""
1234
1235        tmp = " FOR UPDATE"
1236
1237        if select._for_update_arg.of:
1238            tmp += " OF " + ", ".join(
1239                self.process(elem, **kw) for elem in select._for_update_arg.of
1240            )
1241
1242        if select._for_update_arg.nowait:
1243            tmp += " NOWAIT"
1244        if select._for_update_arg.skip_locked:
1245            tmp += " SKIP LOCKED"
1246
1247        return tmp
1248
1249    def visit_is_distinct_from_binary(self, binary, operator, **kw):
1250        return "DECODE(%s, %s, 0, 1) = 1" % (
1251            self.process(binary.left),
1252            self.process(binary.right),
1253        )
1254
1255    def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1256        return "DECODE(%s, %s, 0, 1) = 0" % (
1257            self.process(binary.left),
1258            self.process(binary.right),
1259        )
1260
1261    def _get_regexp_args(self, binary, kw):
1262        string = self.process(binary.left, **kw)
1263        pattern = self.process(binary.right, **kw)
1264        flags = binary.modifiers["flags"]
1265        if flags is not None:
1266            flags = self.process(flags, **kw)
1267        return string, pattern, flags
1268
1269    def visit_regexp_match_op_binary(self, binary, operator, **kw):
1270        string, pattern, flags = self._get_regexp_args(binary, kw)
1271        if flags is None:
1272            return "REGEXP_LIKE(%s, %s)" % (string, pattern)
1273        else:
1274            return "REGEXP_LIKE(%s, %s, %s)" % (string, pattern, flags)
1275
1276    def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1277        return "NOT %s" % self.visit_regexp_match_op_binary(
1278            binary, operator, **kw
1279        )
1280
1281    def visit_regexp_replace_op_binary(self, binary, operator, **kw):
1282        string, pattern, flags = self._get_regexp_args(binary, kw)
1283        replacement = self.process(binary.modifiers["replacement"], **kw)
1284        if flags is None:
1285            return "REGEXP_REPLACE(%s, %s, %s)" % (
1286                string,
1287                pattern,
1288                replacement,
1289            )
1290        else:
1291            return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
1292                string,
1293                pattern,
1294                replacement,
1295                flags,
1296            )
1297
1298
1299class OracleDDLCompiler(compiler.DDLCompiler):
1300    def define_constraint_cascades(self, constraint):
1301        text = ""
1302        if constraint.ondelete is not None:
1303            text += " ON DELETE %s" % constraint.ondelete
1304
1305        # oracle has no ON UPDATE CASCADE -
1306        # its only available via triggers
1307        # https://asktom.oracle.com/tkyte/update_cascade/index.html
1308        if constraint.onupdate is not None:
1309            util.warn(
1310                "Oracle does not contain native UPDATE CASCADE "
1311                "functionality - onupdates will not be rendered for foreign "
1312                "keys.  Consider using deferrable=True, initially='deferred' "
1313                "or triggers."
1314            )
1315
1316        return text
1317
1318    def visit_drop_table_comment(self, drop):
1319        return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
1320            drop.element
1321        )
1322
1323    def visit_create_index(self, create):
1324        index = create.element
1325        self._verify_index_table(index)
1326        preparer = self.preparer
1327        text = "CREATE "
1328        if index.unique:
1329            text += "UNIQUE "
1330        if index.dialect_options["oracle"]["bitmap"]:
1331            text += "BITMAP "
1332        text += "INDEX %s ON %s (%s)" % (
1333            self._prepared_index_name(index, include_schema=True),
1334            preparer.format_table(index.table, use_schema=True),
1335            ", ".join(
1336                self.sql_compiler.process(
1337                    expr, include_table=False, literal_binds=True
1338                )
1339                for expr in index.expressions
1340            ),
1341        )
1342        if index.dialect_options["oracle"]["compress"] is not False:
1343            if index.dialect_options["oracle"]["compress"] is True:
1344                text += " COMPRESS"
1345            else:
1346                text += " COMPRESS %d" % (
1347                    index.dialect_options["oracle"]["compress"]
1348                )
1349        return text
1350
1351    def post_create_table(self, table):
1352        table_opts = []
1353        opts = table.dialect_options["oracle"]
1354
1355        if opts["on_commit"]:
1356            on_commit_options = opts["on_commit"].replace("_", " ").upper()
1357            table_opts.append("\n ON COMMIT %s" % on_commit_options)
1358
1359        if opts["compress"]:
1360            if opts["compress"] is True:
1361                table_opts.append("\n COMPRESS")
1362            else:
1363                table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
1364
1365        return "".join(table_opts)
1366
1367    def get_identity_options(self, identity_options):
1368        text = super(OracleDDLCompiler, self).get_identity_options(
1369            identity_options
1370        )
1371        text = text.replace("NO MINVALUE", "NOMINVALUE")
1372        text = text.replace("NO MAXVALUE", "NOMAXVALUE")
1373        text = text.replace("NO CYCLE", "NOCYCLE")
1374        text = text.replace("NO ORDER", "NOORDER")
1375        return text
1376
1377    def visit_computed_column(self, generated):
1378        text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
1379            generated.sqltext, include_table=False, literal_binds=True
1380        )
1381        if generated.persisted is True:
1382            raise exc.CompileError(
1383                "Oracle computed columns do not support 'stored' persistence; "
1384                "set the 'persisted' flag to None or False for Oracle support."
1385            )
1386        elif generated.persisted is False:
1387            text += " VIRTUAL"
1388        return text
1389
1390    def visit_identity_column(self, identity, **kw):
1391        if identity.always is None:
1392            kind = ""
1393        else:
1394            kind = "ALWAYS" if identity.always else "BY DEFAULT"
1395        text = "GENERATED %s" % kind
1396        if identity.on_null:
1397            text += " ON NULL"
1398        text += " AS IDENTITY"
1399        options = self.get_identity_options(identity)
1400        if options:
1401            text += " (%s)" % options
1402        return text
1403
1404
1405class OracleIdentifierPreparer(compiler.IdentifierPreparer):
1406
1407    reserved_words = {x.lower() for x in RESERVED_WORDS}
1408    illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
1409        ["_", "$"]
1410    )
1411
1412    def _bindparam_requires_quotes(self, value):
1413        """Return True if the given identifier requires quoting."""
1414        lc_value = value.lower()
1415        return (
1416            lc_value in self.reserved_words
1417            or value[0] in self.illegal_initial_characters
1418            or not self.legal_characters.match(util.text_type(value))
1419        )
1420
1421    def format_savepoint(self, savepoint):
1422        name = savepoint.ident.lstrip("_")
1423        return super(OracleIdentifierPreparer, self).format_savepoint(
1424            savepoint, name
1425        )
1426
1427
1428class OracleExecutionContext(default.DefaultExecutionContext):
1429    def fire_sequence(self, seq, type_):
1430        return self._execute_scalar(
1431            "SELECT "
1432            + self.identifier_preparer.format_sequence(seq)
1433            + ".nextval FROM DUAL",
1434            type_,
1435        )
1436
1437
1438class OracleDialect(default.DefaultDialect):
1439    name = "oracle"
1440    supports_statement_cache = True
1441    supports_alter = True
1442    supports_unicode_statements = False
1443    supports_unicode_binds = False
1444    max_identifier_length = 128
1445
1446    supports_simple_order_by_label = False
1447    cte_follows_insert = True
1448
1449    supports_sequences = True
1450    sequences_optional = False
1451    postfetch_lastrowid = False
1452
1453    default_paramstyle = "named"
1454    colspecs = colspecs
1455    ischema_names = ischema_names
1456    requires_name_normalize = True
1457
1458    supports_comments = True
1459
1460    supports_default_values = False
1461    supports_default_metavalue = True
1462    supports_empty_insert = False
1463    supports_identity_columns = True
1464
1465    statement_compiler = OracleCompiler
1466    ddl_compiler = OracleDDLCompiler
1467    type_compiler = OracleTypeCompiler
1468    preparer = OracleIdentifierPreparer
1469    execution_ctx_cls = OracleExecutionContext
1470
1471    reflection_options = ("oracle_resolve_synonyms",)
1472
1473    _use_nchar_for_unicode = False
1474
1475    construct_arguments = [
1476        (
1477            sa_schema.Table,
1478            {"resolve_synonyms": False, "on_commit": None, "compress": False},
1479        ),
1480        (sa_schema.Index, {"bitmap": False, "compress": False}),
1481    ]
1482
1483    @util.deprecated_params(
1484        use_binds_for_limits=(
1485            "1.4",
1486            "The ``use_binds_for_limits`` Oracle dialect parameter is "
1487            "deprecated. The dialect now renders LIMIT /OFFSET integers "
1488            "inline in all cases using a post-compilation hook, so that the "
1489            "value is still represented by a 'bound parameter' on the Core "
1490            "Expression side.",
1491        )
1492    )
1493    def __init__(
1494        self,
1495        use_ansi=True,
1496        optimize_limits=False,
1497        use_binds_for_limits=None,
1498        use_nchar_for_unicode=False,
1499        exclude_tablespaces=("SYSTEM", "SYSAUX"),
1500        **kwargs
1501    ):
1502        default.DefaultDialect.__init__(self, **kwargs)
1503        self._use_nchar_for_unicode = use_nchar_for_unicode
1504        self.use_ansi = use_ansi
1505        self.optimize_limits = optimize_limits
1506        self.exclude_tablespaces = exclude_tablespaces
1507
1508    def initialize(self, connection):
1509        super(OracleDialect, self).initialize(connection)
1510
1511        self.implicit_returning = self.__dict__.get(
1512            "implicit_returning", self.server_version_info > (10,)
1513        )
1514
1515        if self._is_oracle_8:
1516            self.colspecs = self.colspecs.copy()
1517            self.colspecs.pop(sqltypes.Interval)
1518            self.use_ansi = False
1519
1520        self.supports_identity_columns = self.server_version_info >= (12,)
1521
1522    def _get_effective_compat_server_version_info(self, connection):
1523        # dialect does not need compat levels below 12.2, so don't query
1524        # in those cases
1525
1526        if self.server_version_info < (12, 2):
1527            return self.server_version_info
1528        try:
1529            compat = connection.exec_driver_sql(
1530                "SELECT value FROM v$parameter WHERE name = 'compatible'"
1531            ).scalar()
1532        except exc.DBAPIError:
1533            compat = None
1534
1535        if compat:
1536            try:
1537                return tuple(int(x) for x in compat.split("."))
1538            except:
1539                return self.server_version_info
1540        else:
1541            return self.server_version_info
1542
1543    @property
1544    def _is_oracle_8(self):
1545        return self.server_version_info and self.server_version_info < (9,)
1546
1547    @property
1548    def _supports_table_compression(self):
1549        return self.server_version_info and self.server_version_info >= (10, 1)
1550
1551    @property
1552    def _supports_table_compress_for(self):
1553        return self.server_version_info and self.server_version_info >= (11,)
1554
1555    @property
1556    def _supports_char_length(self):
1557        return not self._is_oracle_8
1558
1559    @property
1560    def _supports_update_returning_computed_cols(self):
1561        # on version 18 this error is no longet present while it happens on 11
1562        # it may work also on versions before the 18
1563        return self.server_version_info and self.server_version_info >= (18,)
1564
1565    def do_release_savepoint(self, connection, name):
1566        # Oracle does not support RELEASE SAVEPOINT
1567        pass
1568
1569    def _check_max_identifier_length(self, connection):
1570        if self._get_effective_compat_server_version_info(connection) < (
1571            12,
1572            2,
1573        ):
1574            return 30
1575        else:
1576            # use the default
1577            return None
1578
1579    def _check_unicode_returns(self, connection):
1580        additional_tests = [
1581            expression.cast(
1582                expression.literal_column("'test nvarchar2 returns'"),
1583                sqltypes.NVARCHAR(60),
1584            )
1585        ]
1586        return super(OracleDialect, self)._check_unicode_returns(
1587            connection, additional_tests
1588        )
1589
1590    _isolation_lookup = ["READ COMMITTED", "SERIALIZABLE"]
1591
1592    def get_isolation_level(self, connection):
1593        raise NotImplementedError("implemented by cx_Oracle dialect")
1594
1595    def get_default_isolation_level(self, dbapi_conn):
1596        try:
1597            return self.get_isolation_level(dbapi_conn)
1598        except NotImplementedError:
1599            raise
1600        except:
1601            return "READ COMMITTED"
1602
1603    def set_isolation_level(self, connection, level):
1604        raise NotImplementedError("implemented by cx_Oracle dialect")
1605
1606    def has_table(self, connection, table_name, schema=None):
1607        self._ensure_has_table_connection(connection)
1608
1609        if not schema:
1610            schema = self.default_schema_name
1611
1612        cursor = connection.execute(
1613            sql.text(
1614                "SELECT table_name FROM all_tables "
1615                "WHERE table_name = CAST(:name AS VARCHAR2(128)) "
1616                "AND owner = CAST(:schema_name AS VARCHAR2(128))"
1617            ),
1618            dict(
1619                name=self.denormalize_name(table_name),
1620                schema_name=self.denormalize_name(schema),
1621            ),
1622        )
1623        return cursor.first() is not None
1624
1625    def has_sequence(self, connection, sequence_name, schema=None):
1626        if not schema:
1627            schema = self.default_schema_name
1628        cursor = connection.execute(
1629            sql.text(
1630                "SELECT sequence_name FROM all_sequences "
1631                "WHERE sequence_name = :name AND "
1632                "sequence_owner = :schema_name"
1633            ),
1634            dict(
1635                name=self.denormalize_name(sequence_name),
1636                schema_name=self.denormalize_name(schema),
1637            ),
1638        )
1639        return cursor.first() is not None
1640
1641    def _get_default_schema_name(self, connection):
1642        return self.normalize_name(
1643            connection.exec_driver_sql(
1644                "select sys_context( 'userenv', 'current_schema' ) from dual"
1645            ).scalar()
1646        )
1647
1648    def _resolve_synonym(
1649        self,
1650        connection,
1651        desired_owner=None,
1652        desired_synonym=None,
1653        desired_table=None,
1654    ):
1655        """search for a local synonym matching the given desired owner/name.
1656
1657        if desired_owner is None, attempts to locate a distinct owner.
1658
1659        returns the actual name, owner, dblink name, and synonym name if
1660        found.
1661        """
1662
1663        q = (
1664            "SELECT owner, table_owner, table_name, db_link, "
1665            "synonym_name FROM all_synonyms WHERE "
1666        )
1667        clauses = []
1668        params = {}
1669        if desired_synonym:
1670            clauses.append(
1671                "synonym_name = CAST(:synonym_name AS VARCHAR2(128))"
1672            )
1673            params["synonym_name"] = desired_synonym
1674        if desired_owner:
1675            clauses.append("owner = CAST(:desired_owner AS VARCHAR2(128))")
1676            params["desired_owner"] = desired_owner
1677        if desired_table:
1678            clauses.append("table_name = CAST(:tname AS VARCHAR2(128))")
1679            params["tname"] = desired_table
1680
1681        q += " AND ".join(clauses)
1682
1683        result = connection.execution_options(future_result=True).execute(
1684            sql.text(q), params
1685        )
1686        if desired_owner:
1687            row = result.mappings().first()
1688            if row:
1689                return (
1690                    row["table_name"],
1691                    row["table_owner"],
1692                    row["db_link"],
1693                    row["synonym_name"],
1694                )
1695            else:
1696                return None, None, None, None
1697        else:
1698            rows = result.mappings().all()
1699            if len(rows) > 1:
1700                raise AssertionError(
1701                    "There are multiple tables visible to the schema, you "
1702                    "must specify owner"
1703                )
1704            elif len(rows) == 1:
1705                row = rows[0]
1706                return (
1707                    row["table_name"],
1708                    row["table_owner"],
1709                    row["db_link"],
1710                    row["synonym_name"],
1711                )
1712            else:
1713                return None, None, None, None
1714
1715    @reflection.cache
1716    def _prepare_reflection_args(
1717        self,
1718        connection,
1719        table_name,
1720        schema=None,
1721        resolve_synonyms=False,
1722        dblink="",
1723        **kw
1724    ):
1725
1726        if resolve_synonyms:
1727            actual_name, owner, dblink, synonym = self._resolve_synonym(
1728                connection,
1729                desired_owner=self.denormalize_name(schema),
1730                desired_synonym=self.denormalize_name(table_name),
1731            )
1732        else:
1733            actual_name, owner, dblink, synonym = None, None, None, None
1734        if not actual_name:
1735            actual_name = self.denormalize_name(table_name)
1736
1737        if dblink:
1738            # using user_db_links here since all_db_links appears
1739            # to have more restricted permissions.
1740            # https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
1741            # will need to hear from more users if we are doing
1742            # the right thing here.  See [ticket:2619]
1743            owner = connection.scalar(
1744                sql.text(
1745                    "SELECT username FROM user_db_links " "WHERE db_link=:link"
1746                ),
1747                dict(link=dblink),
1748            )
1749            dblink = "@" + dblink
1750        elif not owner:
1751            owner = self.denormalize_name(schema or self.default_schema_name)
1752
1753        return (actual_name, owner, dblink or "", synonym)
1754
1755    @reflection.cache
1756    def get_schema_names(self, connection, **kw):
1757        s = "SELECT username FROM all_users ORDER BY username"
1758        cursor = connection.exec_driver_sql(s)
1759        return [self.normalize_name(row[0]) for row in cursor]
1760
1761    @reflection.cache
1762    def get_table_names(self, connection, schema=None, **kw):
1763        schema = self.denormalize_name(schema or self.default_schema_name)
1764
1765        # note that table_names() isn't loading DBLINKed or synonym'ed tables
1766        if schema is None:
1767            schema = self.default_schema_name
1768
1769        sql_str = "SELECT table_name FROM all_tables WHERE "
1770        if self.exclude_tablespaces:
1771            sql_str += (
1772                "nvl(tablespace_name, 'no tablespace') "
1773                "NOT IN (%s) AND "
1774                % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces]))
1775            )
1776        sql_str += (
1777            "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL"
1778        )
1779
1780        cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
1781        return [self.normalize_name(row[0]) for row in cursor]
1782
1783    @reflection.cache
1784    def get_temp_table_names(self, connection, **kw):
1785        schema = self.denormalize_name(self.default_schema_name)
1786
1787        sql_str = "SELECT table_name FROM all_tables WHERE "
1788        if self.exclude_tablespaces:
1789            sql_str += (
1790                "nvl(tablespace_name, 'no tablespace') "
1791                "NOT IN (%s) AND "
1792                % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces]))
1793            )
1794        sql_str += (
1795            "OWNER = :owner "
1796            "AND IOT_NAME IS NULL "
1797            "AND DURATION IS NOT NULL"
1798        )
1799
1800        cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
1801        return [self.normalize_name(row[0]) for row in cursor]
1802
1803    @reflection.cache
1804    def get_view_names(self, connection, schema=None, **kw):
1805        schema = self.denormalize_name(schema or self.default_schema_name)
1806        s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner")
1807        cursor = connection.execute(
1808            s, dict(owner=self.denormalize_name(schema))
1809        )
1810        return [self.normalize_name(row[0]) for row in cursor]
1811
1812    @reflection.cache
1813    def get_sequence_names(self, connection, schema=None, **kw):
1814        if not schema:
1815            schema = self.default_schema_name
1816        cursor = connection.execute(
1817            sql.text(
1818                "SELECT sequence_name FROM all_sequences "
1819                "WHERE sequence_owner = :schema_name"
1820            ),
1821            dict(schema_name=self.denormalize_name(schema)),
1822        )
1823        return [self.normalize_name(row[0]) for row in cursor]
1824
1825    @reflection.cache
1826    def get_table_options(self, connection, table_name, schema=None, **kw):
1827        options = {}
1828
1829        resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
1830        dblink = kw.get("dblink", "")
1831        info_cache = kw.get("info_cache")
1832
1833        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
1834            connection,
1835            table_name,
1836            schema,
1837            resolve_synonyms,
1838            dblink,
1839            info_cache=info_cache,
1840        )
1841
1842        params = {"table_name": table_name}
1843
1844        columns = ["table_name"]
1845        if self._supports_table_compression:
1846            columns.append("compression")
1847        if self._supports_table_compress_for:
1848            columns.append("compress_for")
1849
1850        text = (
1851            "SELECT %(columns)s "
1852            "FROM ALL_TABLES%(dblink)s "
1853            "WHERE table_name = CAST(:table_name AS VARCHAR(128))"
1854        )
1855
1856        if schema is not None:
1857            params["owner"] = schema
1858            text += " AND owner = CAST(:owner AS VARCHAR(128)) "
1859        text = text % {"dblink": dblink, "columns": ", ".join(columns)}
1860
1861        result = connection.execute(sql.text(text), params)
1862
1863        enabled = dict(DISABLED=False, ENABLED=True)
1864
1865        row = result.first()
1866        if row:
1867            if "compression" in row._fields and enabled.get(
1868                row.compression, False
1869            ):
1870                if "compress_for" in row._fields:
1871                    options["oracle_compress"] = row.compress_for
1872                else:
1873                    options["oracle_compress"] = True
1874
1875        return options
1876
1877    @reflection.cache
1878    def get_columns(self, connection, table_name, schema=None, **kw):
1879        """
1880
1881        kw arguments can be:
1882
1883            oracle_resolve_synonyms
1884
1885            dblink
1886
1887        """
1888
1889        resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
1890        dblink = kw.get("dblink", "")
1891        info_cache = kw.get("info_cache")
1892
1893        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
1894            connection,
1895            table_name,
1896            schema,
1897            resolve_synonyms,
1898            dblink,
1899            info_cache=info_cache,
1900        )
1901        columns = []
1902        if self._supports_char_length:
1903            char_length_col = "char_length"
1904        else:
1905            char_length_col = "data_length"
1906
1907        if self.server_version_info >= (12,):
1908            identity_cols = """\
1909                col.default_on_null,
1910                (
1911                    SELECT id.generation_type || ',' || id.IDENTITY_OPTIONS
1912                    FROM ALL_TAB_IDENTITY_COLS%(dblink)s id
1913                    WHERE col.table_name = id.table_name
1914                    AND col.column_name = id.column_name
1915                    AND col.owner = id.owner
1916                ) AS identity_options""" % {
1917                "dblink": dblink
1918            }
1919        else:
1920            identity_cols = "NULL as default_on_null, NULL as identity_options"
1921
1922        params = {"table_name": table_name}
1923
1924        text = """
1925            SELECT
1926                col.column_name,
1927                col.data_type,
1928                col.%(char_length_col)s,
1929                col.data_precision,
1930                col.data_scale,
1931                col.nullable,
1932                col.data_default,
1933                com.comments,
1934                col.virtual_column,
1935                %(identity_cols)s
1936            FROM all_tab_cols%(dblink)s col
1937            LEFT JOIN all_col_comments%(dblink)s com
1938            ON col.table_name = com.table_name
1939            AND col.column_name = com.column_name
1940            AND col.owner = com.owner
1941            WHERE col.table_name = CAST(:table_name AS VARCHAR2(128))
1942            AND col.hidden_column = 'NO'
1943        """
1944        if schema is not None:
1945            params["owner"] = schema
1946            text += " AND col.owner = :owner "
1947        text += " ORDER BY col.column_id"
1948        text = text % {
1949            "dblink": dblink,
1950            "char_length_col": char_length_col,
1951            "identity_cols": identity_cols,
1952        }
1953
1954        c = connection.execute(sql.text(text), params)
1955
1956        for row in c:
1957            colname = self.normalize_name(row[0])
1958            orig_colname = row[0]
1959            coltype = row[1]
1960            length = row[2]
1961            precision = row[3]
1962            scale = row[4]
1963            nullable = row[5] == "Y"
1964            default = row[6]
1965            comment = row[7]
1966            generated = row[8]
1967            default_on_nul = row[9]
1968            identity_options = row[10]
1969
1970            if coltype == "NUMBER":
1971                if precision is None and scale == 0:
1972                    coltype = INTEGER()
1973                else:
1974                    coltype = NUMBER(precision, scale)
1975            elif coltype == "FLOAT":
1976                # TODO: support "precision" here as "binary_precision"
1977                coltype = FLOAT()
1978            elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
1979                coltype = self.ischema_names.get(coltype)(length)
1980            elif "WITH TIME ZONE" in coltype:
1981                coltype = TIMESTAMP(timezone=True)
1982            else:
1983                coltype = re.sub(r"\(\d+\)", "", coltype)
1984                try:
1985                    coltype = self.ischema_names[coltype]
1986                except KeyError:
1987                    util.warn(
1988                        "Did not recognize type '%s' of column '%s'"
1989                        % (coltype, colname)
1990                    )
1991                    coltype = sqltypes.NULLTYPE
1992
1993            if generated == "YES":
1994                computed = dict(sqltext=default)
1995                default = None
1996            else:
1997                computed = None
1998
1999            if identity_options is not None:
2000                identity = self._parse_identity_options(
2001                    identity_options, default_on_nul
2002                )
2003                default = None
2004            else:
2005                identity = None
2006
2007            cdict = {
2008                "name": colname,
2009                "type": coltype,
2010                "nullable": nullable,
2011                "default": default,
2012                "autoincrement": "auto",
2013                "comment": comment,
2014            }
2015            if orig_colname.lower() == orig_colname:
2016                cdict["quote"] = True
2017            if computed is not None:
2018                cdict["computed"] = computed
2019            if identity is not None:
2020                cdict["identity"] = identity
2021
2022            columns.append(cdict)
2023        return columns
2024
2025    def _parse_identity_options(self, identity_options, default_on_nul):
2026        # identity_options is a string that starts with 'ALWAYS,' or
2027        # 'BY DEFAULT,' and continues with
2028        # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
2029        # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
2030        # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
2031        parts = [p.strip() for p in identity_options.split(",")]
2032        identity = {
2033            "always": parts[0] == "ALWAYS",
2034            "on_null": default_on_nul == "YES",
2035        }
2036
2037        for part in parts[1:]:
2038            option, value = part.split(":")
2039            value = value.strip()
2040
2041            if "START WITH" in option:
2042                identity["start"] = compat.long_type(value)
2043            elif "INCREMENT BY" in option:
2044                identity["increment"] = compat.long_type(value)
2045            elif "MAX_VALUE" in option:
2046                identity["maxvalue"] = compat.long_type(value)
2047            elif "MIN_VALUE" in option:
2048                identity["minvalue"] = compat.long_type(value)
2049            elif "CYCLE_FLAG" in option:
2050                identity["cycle"] = value == "Y"
2051            elif "CACHE_SIZE" in option:
2052                identity["cache"] = compat.long_type(value)
2053            elif "ORDER_FLAG" in option:
2054                identity["order"] = value == "Y"
2055        return identity
2056
2057    @reflection.cache
2058    def get_table_comment(
2059        self,
2060        connection,
2061        table_name,
2062        schema=None,
2063        resolve_synonyms=False,
2064        dblink="",
2065        **kw
2066    ):
2067
2068        info_cache = kw.get("info_cache")
2069        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
2070            connection,
2071            table_name,
2072            schema,
2073            resolve_synonyms,
2074            dblink,
2075            info_cache=info_cache,
2076        )
2077
2078        if not schema:
2079            schema = self.default_schema_name
2080
2081        COMMENT_SQL = """
2082            SELECT comments
2083            FROM all_tab_comments
2084            WHERE table_name = CAST(:table_name AS VARCHAR(128))
2085            AND owner = CAST(:schema_name AS VARCHAR(128))
2086        """
2087
2088        c = connection.execute(
2089            sql.text(COMMENT_SQL),
2090            dict(table_name=table_name, schema_name=schema),
2091        )
2092        return {"text": c.scalar()}
2093
2094    @reflection.cache
2095    def get_indexes(
2096        self,
2097        connection,
2098        table_name,
2099        schema=None,
2100        resolve_synonyms=False,
2101        dblink="",
2102        **kw
2103    ):
2104
2105        info_cache = kw.get("info_cache")
2106        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
2107            connection,
2108            table_name,
2109            schema,
2110            resolve_synonyms,
2111            dblink,
2112            info_cache=info_cache,
2113        )
2114        indexes = []
2115
2116        params = {"table_name": table_name}
2117        text = (
2118            "SELECT a.index_name, a.column_name, "
2119            "\nb.index_type, b.uniqueness, b.compression, b.prefix_length "
2120            "\nFROM ALL_IND_COLUMNS%(dblink)s a, "
2121            "\nALL_INDEXES%(dblink)s b "
2122            "\nWHERE "
2123            "\na.index_name = b.index_name "
2124            "\nAND a.table_owner = b.table_owner "
2125            "\nAND a.table_name = b.table_name "
2126            "\nAND a.table_name = CAST(:table_name AS VARCHAR(128))"
2127        )
2128
2129        if schema is not None:
2130            params["schema"] = schema
2131            text += "AND a.table_owner = :schema "
2132
2133        text += "ORDER BY a.index_name, a.column_position"
2134
2135        text = text % {"dblink": dblink}
2136
2137        q = sql.text(text)
2138        rp = connection.execute(q, params)
2139        indexes = []
2140        last_index_name = None
2141        pk_constraint = self.get_pk_constraint(
2142            connection,
2143            table_name,
2144            schema,
2145            resolve_synonyms=resolve_synonyms,
2146            dblink=dblink,
2147            info_cache=kw.get("info_cache"),
2148        )
2149
2150        uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
2151        enabled = dict(DISABLED=False, ENABLED=True)
2152
2153        oracle_sys_col = re.compile(r"SYS_NC\d+\$", re.IGNORECASE)
2154
2155        index = None
2156        for rset in rp:
2157            index_name_normalized = self.normalize_name(rset.index_name)
2158
2159            # skip primary key index.  This is refined as of
2160            # [ticket:5421].  Note that ALL_INDEXES.GENERATED will by "Y"
2161            # if the name of this index was generated by Oracle, however
2162            # if a named primary key constraint was created then this flag
2163            # is false.
2164            if (
2165                pk_constraint
2166                and index_name_normalized == pk_constraint["name"]
2167            ):
2168                continue
2169
2170            if rset.index_name != last_index_name:
2171                index = dict(
2172                    name=index_name_normalized,
2173                    column_names=[],
2174                    dialect_options={},
2175                )
2176                indexes.append(index)
2177            index["unique"] = uniqueness.get(rset.uniqueness, False)
2178
2179            if rset.index_type in ("BITMAP", "FUNCTION-BASED BITMAP"):
2180                index["dialect_options"]["oracle_bitmap"] = True
2181            if enabled.get(rset.compression, False):
2182                index["dialect_options"][
2183                    "oracle_compress"
2184                ] = rset.prefix_length
2185
2186            # filter out Oracle SYS_NC names.  could also do an outer join
2187            # to the all_tab_columns table and check for real col names there.
2188            if not oracle_sys_col.match(rset.column_name):
2189                index["column_names"].append(
2190                    self.normalize_name(rset.column_name)
2191                )
2192            last_index_name = rset.index_name
2193
2194        return indexes
2195
2196    @reflection.cache
2197    def _get_constraint_data(
2198        self, connection, table_name, schema=None, dblink="", **kw
2199    ):
2200
2201        params = {"table_name": table_name}
2202
2203        text = (
2204            "SELECT"
2205            "\nac.constraint_name,"  # 0
2206            "\nac.constraint_type,"  # 1
2207            "\nloc.column_name AS local_column,"  # 2
2208            "\nrem.table_name AS remote_table,"  # 3
2209            "\nrem.column_name AS remote_column,"  # 4
2210            "\nrem.owner AS remote_owner,"  # 5
2211            "\nloc.position as loc_pos,"  # 6
2212            "\nrem.position as rem_pos,"  # 7
2213            "\nac.search_condition,"  # 8
2214            "\nac.delete_rule"  # 9
2215            "\nFROM all_constraints%(dblink)s ac,"
2216            "\nall_cons_columns%(dblink)s loc,"
2217            "\nall_cons_columns%(dblink)s rem"
2218            "\nWHERE ac.table_name = CAST(:table_name AS VARCHAR2(128))"
2219            "\nAND ac.constraint_type IN ('R','P', 'U', 'C')"
2220        )
2221
2222        if schema is not None:
2223            params["owner"] = schema
2224            text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))"
2225
2226        text += (
2227            "\nAND ac.owner = loc.owner"
2228            "\nAND ac.constraint_name = loc.constraint_name"
2229            "\nAND ac.r_owner = rem.owner(+)"
2230            "\nAND ac.r_constraint_name = rem.constraint_name(+)"
2231            "\nAND (rem.position IS NULL or loc.position=rem.position)"
2232            "\nORDER BY ac.constraint_name, loc.position"
2233        )
2234
2235        text = text % {"dblink": dblink}
2236        rp = connection.execute(sql.text(text), params)
2237        constraint_data = rp.fetchall()
2238        return constraint_data
2239
2240    @reflection.cache
2241    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
2242        resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
2243        dblink = kw.get("dblink", "")
2244        info_cache = kw.get("info_cache")
2245
2246        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
2247            connection,
2248            table_name,
2249            schema,
2250            resolve_synonyms,
2251            dblink,
2252            info_cache=info_cache,
2253        )
2254        pkeys = []
2255        constraint_name = None
2256        constraint_data = self._get_constraint_data(
2257            connection,
2258            table_name,
2259            schema,
2260            dblink,
2261            info_cache=kw.get("info_cache"),
2262        )
2263
2264        for row in constraint_data:
2265            (
2266                cons_name,
2267                cons_type,
2268                local_column,
2269                remote_table,
2270                remote_column,
2271                remote_owner,
2272            ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
2273            if cons_type == "P":
2274                if constraint_name is None:
2275                    constraint_name = self.normalize_name(cons_name)
2276                pkeys.append(local_column)
2277        return {"constrained_columns": pkeys, "name": constraint_name}
2278
2279    @reflection.cache
2280    def get_foreign_keys(self, connection, table_name, schema=None, **kw):
2281        """
2282
2283        kw arguments can be:
2284
2285            oracle_resolve_synonyms
2286
2287            dblink
2288
2289        """
2290        requested_schema = schema  # to check later on
2291        resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
2292        dblink = kw.get("dblink", "")
2293        info_cache = kw.get("info_cache")
2294
2295        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
2296            connection,
2297            table_name,
2298            schema,
2299            resolve_synonyms,
2300            dblink,
2301            info_cache=info_cache,
2302        )
2303
2304        constraint_data = self._get_constraint_data(
2305            connection,
2306            table_name,
2307            schema,
2308            dblink,
2309            info_cache=kw.get("info_cache"),
2310        )
2311
2312        def fkey_rec():
2313            return {
2314                "name": None,
2315                "constrained_columns": [],
2316                "referred_schema": None,
2317                "referred_table": None,
2318                "referred_columns": [],
2319                "options": {},
2320            }
2321
2322        fkeys = util.defaultdict(fkey_rec)
2323
2324        for row in constraint_data:
2325            (
2326                cons_name,
2327                cons_type,
2328                local_column,
2329                remote_table,
2330                remote_column,
2331                remote_owner,
2332            ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
2333
2334            cons_name = self.normalize_name(cons_name)
2335
2336            if cons_type == "R":
2337                if remote_table is None:
2338                    # ticket 363
2339                    util.warn(
2340                        (
2341                            "Got 'None' querying 'table_name' from "
2342                            "all_cons_columns%(dblink)s - does the user have "
2343                            "proper rights to the table?"
2344                        )
2345                        % {"dblink": dblink}
2346                    )
2347                    continue
2348
2349                rec = fkeys[cons_name]
2350                rec["name"] = cons_name
2351                local_cols, remote_cols = (
2352                    rec["constrained_columns"],
2353                    rec["referred_columns"],
2354                )
2355
2356                if not rec["referred_table"]:
2357                    if resolve_synonyms:
2358                        (
2359                            ref_remote_name,
2360                            ref_remote_owner,
2361                            ref_dblink,
2362                            ref_synonym,
2363                        ) = self._resolve_synonym(
2364                            connection,
2365                            desired_owner=self.denormalize_name(remote_owner),
2366                            desired_table=self.denormalize_name(remote_table),
2367                        )
2368                        if ref_synonym:
2369                            remote_table = self.normalize_name(ref_synonym)
2370                            remote_owner = self.normalize_name(
2371                                ref_remote_owner
2372                            )
2373
2374                    rec["referred_table"] = remote_table
2375
2376                    if (
2377                        requested_schema is not None
2378                        or self.denormalize_name(remote_owner) != schema
2379                    ):
2380                        rec["referred_schema"] = remote_owner
2381
2382                    if row[9] != "NO ACTION":
2383                        rec["options"]["ondelete"] = row[9]
2384
2385                local_cols.append(local_column)
2386                remote_cols.append(remote_column)
2387
2388        return list(fkeys.values())
2389
2390    @reflection.cache
2391    def get_unique_constraints(
2392        self, connection, table_name, schema=None, **kw
2393    ):
2394        resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
2395        dblink = kw.get("dblink", "")
2396        info_cache = kw.get("info_cache")
2397
2398        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
2399            connection,
2400            table_name,
2401            schema,
2402            resolve_synonyms,
2403            dblink,
2404            info_cache=info_cache,
2405        )
2406
2407        constraint_data = self._get_constraint_data(
2408            connection,
2409            table_name,
2410            schema,
2411            dblink,
2412            info_cache=kw.get("info_cache"),
2413        )
2414
2415        unique_keys = filter(lambda x: x[1] == "U", constraint_data)
2416        uniques_group = groupby(unique_keys, lambda x: x[0])
2417
2418        index_names = {
2419            ix["name"]
2420            for ix in self.get_indexes(connection, table_name, schema=schema)
2421        }
2422        return [
2423            {
2424                "name": name,
2425                "column_names": cols,
2426                "duplicates_index": name if name in index_names else None,
2427            }
2428            for name, cols in [
2429                [
2430                    self.normalize_name(i[0]),
2431                    [self.normalize_name(x[2]) for x in i[1]],
2432                ]
2433                for i in uniques_group
2434            ]
2435        ]
2436
2437    @reflection.cache
2438    def get_view_definition(
2439        self,
2440        connection,
2441        view_name,
2442        schema=None,
2443        resolve_synonyms=False,
2444        dblink="",
2445        **kw
2446    ):
2447        info_cache = kw.get("info_cache")
2448        (view_name, schema, dblink, synonym) = self._prepare_reflection_args(
2449            connection,
2450            view_name,
2451            schema,
2452            resolve_synonyms,
2453            dblink,
2454            info_cache=info_cache,
2455        )
2456
2457        params = {"view_name": view_name}
2458        text = "SELECT text FROM all_views WHERE view_name=:view_name"
2459
2460        if schema is not None:
2461            text += " AND owner = :schema"
2462            params["schema"] = schema
2463
2464        rp = connection.execute(sql.text(text), params).scalar()
2465        if rp:
2466            if util.py2k:
2467                rp = rp.decode(self.encoding)
2468            return rp
2469        else:
2470            return None
2471
2472    @reflection.cache
2473    def get_check_constraints(
2474        self, connection, table_name, schema=None, include_all=False, **kw
2475    ):
2476        resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
2477        dblink = kw.get("dblink", "")
2478        info_cache = kw.get("info_cache")
2479
2480        (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
2481            connection,
2482            table_name,
2483            schema,
2484            resolve_synonyms,
2485            dblink,
2486            info_cache=info_cache,
2487        )
2488
2489        constraint_data = self._get_constraint_data(
2490            connection,
2491            table_name,
2492            schema,
2493            dblink,
2494            info_cache=kw.get("info_cache"),
2495        )
2496
2497        check_constraints = filter(lambda x: x[1] == "C", constraint_data)
2498
2499        return [
2500            {"name": self.normalize_name(cons[0]), "sqltext": cons[8]}
2501            for cons in check_constraints
2502            if include_all or not re.match(r"..+?. IS NOT NULL$", cons[8])
2503        ]
2504
2505
2506class _OuterJoinColumn(sql.ClauseElement):
2507    __visit_name__ = "outer_join_column"
2508
2509    def __init__(self, column):
2510        self.column = column
2511