1# oracle/base.py
2# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8"""
9.. dialect:: oracle
10    :name: Oracle
11
12    Oracle version 8 through current (11g at the time of this writing) are
13    supported.
14
15Connect Arguments
16-----------------
17
18The dialect supports several :func:`~sqlalchemy.create_engine()` arguments
19which affect the behavior of the dialect regardless of driver in use.
20
21* ``use_ansi`` - Use ANSI JOIN constructs (see the section on Oracle 8).
22  Defaults to ``True``.  If ``False``, Oracle-8 compatible constructs are used
23  for joins.
24
25* ``optimize_limits`` - defaults to ``False``. see the section on
26  LIMIT/OFFSET.
27
28* ``use_binds_for_limits`` - defaults to ``True``.  see the section on
29  LIMIT/OFFSET.
30
31Auto Increment Behavior
32-----------------------
33
34SQLAlchemy Table objects which include integer primary keys are usually
35assumed to have "autoincrementing" behavior, meaning they can generate their
36own primary key values upon INSERT.  Since Oracle has no "autoincrement"
37feature, SQLAlchemy relies upon sequences to produce these values.   With the
38Oracle dialect, *a sequence must always be explicitly specified to enable
39autoincrement*.  This is divergent with the majority of documentation
40examples which assume the usage of an autoincrement-capable database.   To
41specify sequences, use the sqlalchemy.schema.Sequence object which is passed
42to a Column construct::
43
44  t = Table('mytable', metadata,
45        Column('id', Integer, Sequence('id_seq'), primary_key=True),
46        Column(...), ...
47  )
48
49This step is also required when using table reflection, i.e. autoload=True::
50
51  t = Table('mytable', metadata,
52        Column('id', Integer, Sequence('id_seq'), primary_key=True),
53        autoload=True
54  )
55
56Identifier Casing
57-----------------
58
59In Oracle, the data dictionary represents all case insensitive identifier
60names using UPPERCASE text.   SQLAlchemy on the other hand considers an
61all-lower case identifier name to be case insensitive.   The Oracle dialect
62converts all case insensitive identifiers to and from those two formats during
63schema level communication, such as reflection of tables and indexes.   Using
64an UPPERCASE name on the SQLAlchemy side indicates a case sensitive
65identifier, and SQLAlchemy will quote the name - this will cause mismatches
66against data dictionary data received from Oracle, so unless identifier names
67have been truly created as case sensitive (i.e. using quoted names), all
68lowercase names should be used on the SQLAlchemy side.
69
70
71LIMIT/OFFSET Support
72--------------------
73
74Oracle has no support for the LIMIT or OFFSET keywords.  SQLAlchemy uses
75a wrapped subquery approach in conjunction with ROWNUM.  The exact methodology
76is taken from
77http://www.oracle.com/technetwork/issue-archive/2006/06-sep/o56asktom-086197.html .
78
79There are two options which affect its behavior:
80
81* the "FIRST ROWS()" optimization keyword is not used by default.  To enable
82  the usage of this optimization directive, specify ``optimize_limits=True``
83  to :func:`.create_engine`.
84* the values passed for the limit/offset are sent as bound parameters.   Some
85  users have observed that Oracle produces a poor query plan when the values
86  are sent as binds and not rendered literally.   To render the limit/offset
87  values literally within the SQL statement, specify
88  ``use_binds_for_limits=False`` to :func:`.create_engine`.
89
90Some users have reported better performance when the entirely different
91approach of a window query is used, i.e. ROW_NUMBER() OVER (ORDER BY), to
92provide LIMIT/OFFSET (note that the majority of users don't observe this).
93To suit this case the method used for LIMIT/OFFSET can be replaced entirely.
94See the recipe at
95http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowFunctionsByDefault
96which installs a select compiler that overrides the generation of limit/offset
97with a window function.
98
99.. _oracle_returning:
100
101RETURNING Support
102-----------------
103
104The Oracle database supports a limited form of RETURNING, in order to retrieve
105result sets of matched rows from INSERT, UPDATE and DELETE statements.
106Oracle's RETURNING..INTO syntax only supports one row being returned, as it
107relies upon OUT parameters in order to function.  In addition, supported
108DBAPIs have further limitations (see :ref:`cx_oracle_returning`).
109
110SQLAlchemy's "implicit returning" feature, which employs RETURNING within an
111INSERT and sometimes an UPDATE statement in order to fetch newly generated
112primary key values and other SQL defaults and expressions, is normally enabled
113on the Oracle backend.  By default, "implicit returning" typically only
114fetches the value of a single ``nextval(some_seq)`` expression embedded into
115an INSERT in order to increment a sequence within an INSERT statement and get
116the value back at the same time. To disable this feature across the board,
117specify ``implicit_returning=False`` to :func:`.create_engine`::
118
119    engine = create_engine("oracle://scott:tiger@dsn",
120                           implicit_returning=False)
121
122Implicit returning can also be disabled on a table-by-table basis as a table
123option::
124
125    # Core Table
126    my_table = Table("my_table", metadata, ..., implicit_returning=False)
127
128
129    # declarative
130    class MyClass(Base):
131        __tablename__ = 'my_table'
132        __table_args__ = {"implicit_returning": False}
133
134.. seealso::
135
136    :ref:`cx_oracle_returning` - additional cx_oracle-specific restrictions on
137    implicit returning.
138
139ON UPDATE CASCADE
140-----------------
141
142Oracle doesn't have native ON UPDATE CASCADE functionality.  A trigger based
143solution is available at
144http://asktom.oracle.com/tkyte/update_cascade/index.html .
145
146When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
147cascading updates - specify ForeignKey objects using the
148"deferrable=True, initially='deferred'" keyword arguments,
149and specify "passive_updates=False" on each relationship().
150
151Oracle 8 Compatibility
152----------------------
153
154When Oracle 8 is detected, the dialect internally configures itself to the
155following behaviors:
156
157* the use_ansi flag is set to False.  This has the effect of converting all
158  JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
159  makes use of Oracle's (+) operator.
160
161* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
162  the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are
163  issued instead.   This because these types don't seem to work correctly on
164  Oracle 8 even though they are available.  The
165  :class:`~sqlalchemy.types.NVARCHAR` and
166  :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
167  NVARCHAR2 and NCLOB.
168
169* the "native unicode" mode is disabled when using cx_oracle, i.e. SQLAlchemy
170  encodes all Python unicode objects to "string" before passing in as bind
171  parameters.
172
173Synonym/DBLINK Reflection
174-------------------------
175
176When using reflection with Table objects, the dialect can optionally search
177for tables indicated by synonyms, either in local or remote schemas or
178accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
179a keyword argument to the :class:`.Table` construct::
180
181    some_table = Table('some_table', autoload=True,
182                                autoload_with=some_engine,
183                                oracle_resolve_synonyms=True)
184
185When this flag is set, the given name (such as ``some_table`` above) will
186be searched not just in the ``ALL_TABLES`` view, but also within the
187``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
188name.  If the synonym is located and refers to a DBLINK, the oracle dialect
189knows how to locate the table's information using DBLINK syntax(e.g.
190``@dblink``).
191
192``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
193accepted, including methods such as :meth:`.MetaData.reflect` and
194:meth:`.Inspector.get_columns`.
195
196If synonyms are not in use, this flag should be left disabled.
197
198Table names with SYSTEM/SYSAUX tablespaces
199-------------------------------------------
200
201The :meth:`.Inspector.get_table_names` and
202:meth:`.Inspector.get_temp_table_names`
203methods each return a list of table names for the current engine. These methods
204are also part of the reflection which occurs within an operation such as
205:meth:`.MetaData.reflect`.  By default, these operations exclude the ``SYSTEM``
206and ``SYSAUX`` tablespaces from the operation.   In order to change this, the
207default list of tablespaces excluded can be changed at the engine level using
208the ``exclude_tablespaces`` parameter::
209
210    # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
211    e = create_engine(
212      "oracle://scott:tiger@xe",
213      exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"])
214
215.. versionadded:: 1.1
216
217DateTime Compatibility
218----------------------
219
220Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``,
221which can actually store a date and time value.  For this reason, the Oracle
222dialect provides a type :class:`.oracle.DATE` which is a subclass of
223:class:`.DateTime`.   This type has no special behavior, and is only
224present as a "marker" for this type; additionally, when a database column
225is reflected and the type is reported as ``DATE``, the time-supporting
226:class:`.oracle.DATE` type is used.
227
228.. versionchanged:: 0.9.4 Added :class:`.oracle.DATE` to subclass
229   :class:`.DateTime`.  This is a change as previous versions
230   would reflect a ``DATE`` column as :class:`.types.DATE`, which subclasses
231   :class:`.Date`.   The only significance here is for schemes that are
232   examining the type of column for use in special Python translations or
233   for migrating schemas to other database backends.
234
235.. _oracle_table_options:
236
237Oracle Table Options
238-------------------------
239
240The CREATE TABLE phrase supports the following options with Oracle
241in conjunction with the :class:`.Table` construct:
242
243
244* ``ON COMMIT``::
245
246    Table(
247        "some_table", metadata, ...,
248        prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS')
249
250.. versionadded:: 1.0.0
251
252* ``COMPRESS``::
253
254    Table('mytable', metadata, Column('data', String(32)),
255        oracle_compress=True)
256
257    Table('mytable', metadata, Column('data', String(32)),
258        oracle_compress=6)
259
260   The ``oracle_compress`` parameter accepts either an integer compression
261   level, or ``True`` to use the default compression level.
262
263.. versionadded:: 1.0.0
264
265.. _oracle_index_options:
266
267Oracle Specific Index Options
268-----------------------------
269
270Bitmap Indexes
271~~~~~~~~~~~~~~
272
273You can specify the ``oracle_bitmap`` parameter to create a bitmap index
274instead of a B-tree index::
275
276    Index('my_index', my_table.c.data, oracle_bitmap=True)
277
278Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
279check for such limitations, only the database will.
280
281.. versionadded:: 1.0.0
282
283Index compression
284~~~~~~~~~~~~~~~~~
285
286Oracle has a more efficient storage mode for indexes containing lots of
287repeated values. Use the ``oracle_compress`` parameter to turn on key c
288ompression::
289
290    Index('my_index', my_table.c.data, oracle_compress=True)
291
292    Index('my_index', my_table.c.data1, my_table.c.data2, unique=True,
293           oracle_compress=1)
294
295The ``oracle_compress`` parameter accepts either an integer specifying the
296number of prefix columns to compress, or ``True`` to use the default (all
297columns for non-unique indexes, all but the last column for unique indexes).
298
299.. versionadded:: 1.0.0
300
301"""
302
303import re
304
305from sqlalchemy import util, sql
306from sqlalchemy.engine import default, reflection
307from sqlalchemy.sql import compiler, visitors, expression, util as sql_util
308from sqlalchemy.sql import operators as sql_operators
309from sqlalchemy.sql.elements import quoted_name
310from sqlalchemy import types as sqltypes, schema as sa_schema
311from sqlalchemy.types import VARCHAR, NVARCHAR, CHAR, \
312    BLOB, CLOB, TIMESTAMP, FLOAT
313
314RESERVED_WORDS = \
315    set('SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN '
316        'DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED '
317        'ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE '
318        'ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE '
319        'BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES '
320        'AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS '
321        'NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER '
322        'CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR '
323        'DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL'.split())
324
325NO_ARG_FNS = set('UID CURRENT_DATE SYSDATE USER '
326                 'CURRENT_TIME CURRENT_TIMESTAMP'.split())
327
328
329class RAW(sqltypes._Binary):
330    __visit_name__ = 'RAW'
331OracleRaw = RAW
332
333
334class NCLOB(sqltypes.Text):
335    __visit_name__ = 'NCLOB'
336
337
338class VARCHAR2(VARCHAR):
339    __visit_name__ = 'VARCHAR2'
340
341NVARCHAR2 = NVARCHAR
342
343
344class NUMBER(sqltypes.Numeric, sqltypes.Integer):
345    __visit_name__ = 'NUMBER'
346
347    def __init__(self, precision=None, scale=None, asdecimal=None):
348        if asdecimal is None:
349            asdecimal = bool(scale and scale > 0)
350
351        super(NUMBER, self).__init__(
352            precision=precision, scale=scale, asdecimal=asdecimal)
353
354    def adapt(self, impltype):
355        ret = super(NUMBER, self).adapt(impltype)
356        # leave a hint for the DBAPI handler
357        ret._is_oracle_number = True
358        return ret
359
360    @property
361    def _type_affinity(self):
362        if bool(self.scale and self.scale > 0):
363            return sqltypes.Numeric
364        else:
365            return sqltypes.Integer
366
367
368class DOUBLE_PRECISION(sqltypes.Numeric):
369    __visit_name__ = 'DOUBLE_PRECISION'
370
371    def __init__(self, precision=None, scale=None, asdecimal=None):
372        if asdecimal is None:
373            asdecimal = False
374
375        super(DOUBLE_PRECISION, self).__init__(
376            precision=precision, scale=scale, asdecimal=asdecimal)
377
378
379class BFILE(sqltypes.LargeBinary):
380    __visit_name__ = 'BFILE'
381
382
383class LONG(sqltypes.Text):
384    __visit_name__ = 'LONG'
385
386
387class DATE(sqltypes.DateTime):
388    """Provide the oracle DATE type.
389
390    This type has no special Python behavior, except that it subclasses
391    :class:`.types.DateTime`; this is to suit the fact that the Oracle
392    ``DATE`` type supports a time value.
393
394    .. versionadded:: 0.9.4
395
396    """
397    __visit_name__ = 'DATE'
398
399    def _compare_type_affinity(self, other):
400        return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
401
402
403class INTERVAL(sqltypes.TypeEngine):
404    __visit_name__ = 'INTERVAL'
405
406    def __init__(self,
407                 day_precision=None,
408                 second_precision=None):
409        """Construct an INTERVAL.
410
411        Note that only DAY TO SECOND intervals are currently supported.
412        This is due to a lack of support for YEAR TO MONTH intervals
413        within available DBAPIs (cx_oracle and zxjdbc).
414
415        :param day_precision: the day precision value.  this is the number of
416          digits to store for the day field.  Defaults to "2"
417        :param second_precision: the second precision value.  this is the
418          number of digits to store for the fractional seconds field.
419          Defaults to "6".
420
421        """
422        self.day_precision = day_precision
423        self.second_precision = second_precision
424
425    @classmethod
426    def _adapt_from_generic_interval(cls, interval):
427        return INTERVAL(day_precision=interval.day_precision,
428                        second_precision=interval.second_precision)
429
430    @property
431    def _type_affinity(self):
432        return sqltypes.Interval
433
434
435class ROWID(sqltypes.TypeEngine):
436    """Oracle ROWID type.
437
438    When used in a cast() or similar, generates ROWID.
439
440    """
441    __visit_name__ = 'ROWID'
442
443
444class _OracleBoolean(sqltypes.Boolean):
445    def get_dbapi_type(self, dbapi):
446        return dbapi.NUMBER
447
448colspecs = {
449    sqltypes.Boolean: _OracleBoolean,
450    sqltypes.Interval: INTERVAL,
451    sqltypes.DateTime: DATE
452}
453
454ischema_names = {
455    'VARCHAR2': VARCHAR,
456    'NVARCHAR2': NVARCHAR,
457    'CHAR': CHAR,
458    'DATE': DATE,
459    'NUMBER': NUMBER,
460    'BLOB': BLOB,
461    'BFILE': BFILE,
462    'CLOB': CLOB,
463    'NCLOB': NCLOB,
464    'TIMESTAMP': TIMESTAMP,
465    'TIMESTAMP WITH TIME ZONE': TIMESTAMP,
466    'INTERVAL DAY TO SECOND': INTERVAL,
467    'RAW': RAW,
468    'FLOAT': FLOAT,
469    'DOUBLE PRECISION': DOUBLE_PRECISION,
470    'LONG': LONG,
471}
472
473
474class OracleTypeCompiler(compiler.GenericTypeCompiler):
475    # Note:
476    # Oracle DATE == DATETIME
477    # Oracle does not allow milliseconds in DATE
478    # Oracle does not support TIME columns
479
480    def visit_datetime(self, type_, **kw):
481        return self.visit_DATE(type_, **kw)
482
483    def visit_float(self, type_, **kw):
484        return self.visit_FLOAT(type_, **kw)
485
486    def visit_unicode(self, type_, **kw):
487        if self.dialect._supports_nchar:
488            return self.visit_NVARCHAR2(type_, **kw)
489        else:
490            return self.visit_VARCHAR2(type_, **kw)
491
492    def visit_INTERVAL(self, type_, **kw):
493        return "INTERVAL DAY%s TO SECOND%s" % (
494            type_.day_precision is not None and
495            "(%d)" % type_.day_precision or
496            "",
497            type_.second_precision is not None and
498            "(%d)" % type_.second_precision or
499            "",
500        )
501
502    def visit_LONG(self, type_, **kw):
503        return "LONG"
504
505    def visit_TIMESTAMP(self, type_, **kw):
506        if type_.timezone:
507            return "TIMESTAMP WITH TIME ZONE"
508        else:
509            return "TIMESTAMP"
510
511    def visit_DOUBLE_PRECISION(self, type_, **kw):
512        return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
513
514    def visit_NUMBER(self, type_, **kw):
515        return self._generate_numeric(type_, "NUMBER", **kw)
516
517    def _generate_numeric(self, type_, name, precision=None, scale=None, **kw):
518        if precision is None:
519            precision = type_.precision
520
521        if scale is None:
522            scale = getattr(type_, 'scale', None)
523
524        if precision is None:
525            return name
526        elif scale is None:
527            n = "%(name)s(%(precision)s)"
528            return n % {'name': name, 'precision': precision}
529        else:
530            n = "%(name)s(%(precision)s, %(scale)s)"
531            return n % {'name': name, 'precision': precision, 'scale': scale}
532
533    def visit_string(self, type_, **kw):
534        return self.visit_VARCHAR2(type_, **kw)
535
536    def visit_VARCHAR2(self, type_, **kw):
537        return self._visit_varchar(type_, '', '2')
538
539    def visit_NVARCHAR2(self, type_, **kw):
540        return self._visit_varchar(type_, 'N', '2')
541    visit_NVARCHAR = visit_NVARCHAR2
542
543    def visit_VARCHAR(self, type_, **kw):
544        return self._visit_varchar(type_, '', '')
545
546    def _visit_varchar(self, type_, n, num):
547        if not type_.length:
548            return "%(n)sVARCHAR%(two)s" % {'two': num, 'n': n}
549        elif not n and self.dialect._supports_char_length:
550            varchar = "VARCHAR%(two)s(%(length)s CHAR)"
551            return varchar % {'length': type_.length, 'two': num}
552        else:
553            varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
554            return varchar % {'length': type_.length, 'two': num, 'n': n}
555
556    def visit_text(self, type_, **kw):
557        return self.visit_CLOB(type_, **kw)
558
559    def visit_unicode_text(self, type_, **kw):
560        if self.dialect._supports_nchar:
561            return self.visit_NCLOB(type_, **kw)
562        else:
563            return self.visit_CLOB(type_, **kw)
564
565    def visit_large_binary(self, type_, **kw):
566        return self.visit_BLOB(type_, **kw)
567
568    def visit_big_integer(self, type_, **kw):
569        return self.visit_NUMBER(type_, precision=19, **kw)
570
571    def visit_boolean(self, type_, **kw):
572        return self.visit_SMALLINT(type_, **kw)
573
574    def visit_RAW(self, type_, **kw):
575        if type_.length:
576            return "RAW(%(length)s)" % {'length': type_.length}
577        else:
578            return "RAW"
579
580    def visit_ROWID(self, type_, **kw):
581        return "ROWID"
582
583
584class OracleCompiler(compiler.SQLCompiler):
585    """Oracle compiler modifies the lexical structure of Select
586    statements to work under non-ANSI configured Oracle databases, if
587    the use_ansi flag is False.
588    """
589
590    compound_keywords = util.update_copy(
591        compiler.SQLCompiler.compound_keywords,
592        {
593            expression.CompoundSelect.EXCEPT: 'MINUS'
594        }
595    )
596
597    def __init__(self, *args, **kwargs):
598        self.__wheres = {}
599        self._quoted_bind_names = {}
600        super(OracleCompiler, self).__init__(*args, **kwargs)
601
602    def visit_mod_binary(self, binary, operator, **kw):
603        return "mod(%s, %s)" % (self.process(binary.left, **kw),
604                                self.process(binary.right, **kw))
605
606    def visit_now_func(self, fn, **kw):
607        return "CURRENT_TIMESTAMP"
608
609    def visit_char_length_func(self, fn, **kw):
610        return "LENGTH" + self.function_argspec(fn, **kw)
611
612    def visit_match_op_binary(self, binary, operator, **kw):
613        return "CONTAINS (%s, %s)" % (self.process(binary.left),
614                                      self.process(binary.right))
615
616    def visit_true(self, expr, **kw):
617        return '1'
618
619    def visit_false(self, expr, **kw):
620        return '0'
621
622    def get_cte_preamble(self, recursive):
623        return "WITH"
624
625    def get_select_hint_text(self, byfroms):
626        return " ".join(
627            "/*+ %s */" % text for table, text in byfroms.items()
628        )
629
630    def function_argspec(self, fn, **kw):
631        if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
632            return compiler.SQLCompiler.function_argspec(self, fn, **kw)
633        else:
634            return ""
635
636    def default_from(self):
637        """Called when a ``SELECT`` statement has no froms,
638        and no ``FROM`` clause is to be appended.
639
640        The Oracle compiler tacks a "FROM DUAL" to the statement.
641        """
642
643        return " FROM DUAL"
644
645    def visit_join(self, join, **kwargs):
646        if self.dialect.use_ansi:
647            return compiler.SQLCompiler.visit_join(self, join, **kwargs)
648        else:
649            kwargs['asfrom'] = True
650            if isinstance(join.right, expression.FromGrouping):
651                right = join.right.element
652            else:
653                right = join.right
654            return self.process(join.left, **kwargs) + \
655                ", " + self.process(right, **kwargs)
656
657    def _get_nonansi_join_whereclause(self, froms):
658        clauses = []
659
660        def visit_join(join):
661            if join.isouter:
662                def visit_binary(binary):
663                    if binary.operator == sql_operators.eq:
664                        if join.right.is_derived_from(binary.left.table):
665                            binary.left = _OuterJoinColumn(binary.left)
666                        elif join.right.is_derived_from(binary.right.table):
667                            binary.right = _OuterJoinColumn(binary.right)
668                clauses.append(visitors.cloned_traverse(
669                    join.onclause, {}, {'binary': visit_binary}))
670            else:
671                clauses.append(join.onclause)
672
673            for j in join.left, join.right:
674                if isinstance(j, expression.Join):
675                    visit_join(j)
676                elif isinstance(j, expression.FromGrouping):
677                    visit_join(j.element)
678
679        for f in froms:
680            if isinstance(f, expression.Join):
681                visit_join(f)
682
683        if not clauses:
684            return None
685        else:
686            return sql.and_(*clauses)
687
688    def visit_outer_join_column(self, vc, **kw):
689        return self.process(vc.column, **kw) + "(+)"
690
691    def visit_sequence(self, seq):
692        return (self.dialect.identifier_preparer.format_sequence(seq) +
693                ".nextval")
694
695    def get_render_as_alias_suffix(self, alias_name_text):
696        """Oracle doesn't like ``FROM table AS alias``"""
697
698        return " " + alias_name_text
699
700    def returning_clause(self, stmt, returning_cols):
701        columns = []
702        binds = []
703        for i, column in enumerate(
704                expression._select_iterables(returning_cols)):
705            if column.type._has_column_expression:
706                col_expr = column.type.column_expression(column)
707            else:
708                col_expr = column
709            outparam = sql.outparam("ret_%d" % i, type_=column.type)
710            self.binds[outparam.key] = outparam
711            binds.append(
712                self.bindparam_string(self._truncate_bindparam(outparam)))
713            columns.append(
714                self.process(col_expr, within_columns_clause=False))
715
716            self._add_to_result_map(
717                outparam.key, outparam.key,
718                (column, getattr(column, 'name', None),
719                 getattr(column, 'key', None)),
720                column.type
721            )
722
723        return 'RETURNING ' + ', '.join(columns) + " INTO " + ", ".join(binds)
724
725    def _TODO_visit_compound_select(self, select):
726        """Need to determine how to get ``LIMIT``/``OFFSET`` into a
727        ``UNION`` for Oracle.
728        """
729        pass
730
731    def visit_select(self, select, **kwargs):
732        """Look for ``LIMIT`` and OFFSET in a select statement, and if
733        so tries to wrap it in a subquery with ``rownum`` criterion.
734        """
735
736        if not getattr(select, '_oracle_visit', None):
737            if not self.dialect.use_ansi:
738                froms = self._display_froms_for_select(
739                    select, kwargs.get('asfrom', False))
740                whereclause = self._get_nonansi_join_whereclause(froms)
741                if whereclause is not None:
742                    select = select.where(whereclause)
743                    select._oracle_visit = True
744
745            limit_clause = select._limit_clause
746            offset_clause = select._offset_clause
747            if limit_clause is not None or offset_clause is not None:
748                # See http://www.oracle.com/technology/oramag/oracle/06-sep/\
749                # o56asktom.html
750                #
751                # Generalized form of an Oracle pagination query:
752                #   select ... from (
753                #     select /*+ FIRST_ROWS(N) */ ...., rownum as ora_rn from
754                #       (  select distinct ... where ... order by ...
755                #     ) where ROWNUM <= :limit+:offset
756                #   ) where ora_rn > :offset
757                # Outer select and "ROWNUM as ora_rn" can be dropped if
758                # limit=0
759
760                kwargs['select_wraps_for'] = select
761                select = select._generate()
762                select._oracle_visit = True
763
764                # Wrap the middle select and add the hint
765                limitselect = sql.select([c for c in select.c])
766                if limit_clause is not None and \
767                    self.dialect.optimize_limits and \
768                        select._simple_int_limit:
769                    limitselect = limitselect.prefix_with(
770                        "/*+ FIRST_ROWS(%d) */" %
771                        select._limit)
772
773                limitselect._oracle_visit = True
774                limitselect._is_wrapper = True
775
776                # add expressions to accommodate FOR UPDATE OF
777                for_update = select._for_update_arg
778                if for_update is not None and for_update.of:
779                    for_update = for_update._clone()
780                    for_update._copy_internals()
781
782                    for elem in for_update.of:
783                        select.append_column(elem)
784
785                    adapter = sql_util.ClauseAdapter(select)
786                    for_update.of = [
787                        adapter.traverse(elem)
788                        for elem in for_update.of]
789
790                # If needed, add the limiting clause
791                if limit_clause is not None:
792                    if not self.dialect.use_binds_for_limits:
793                        # use simple int limits, will raise an exception
794                        # if the limit isn't specified this way
795                        max_row = select._limit
796
797                        if offset_clause is not None:
798                            max_row += select._offset
799                        max_row = sql.literal_column("%d" % max_row)
800                    else:
801                        max_row = limit_clause
802                        if offset_clause is not None:
803                            max_row = max_row + offset_clause
804                    limitselect.append_whereclause(
805                        sql.literal_column("ROWNUM") <= max_row)
806
807                # If needed, add the ora_rn, and wrap again with offset.
808                if offset_clause is None:
809                    limitselect._for_update_arg = for_update
810                    select = limitselect
811                else:
812                    limitselect = limitselect.column(
813                        sql.literal_column("ROWNUM").label("ora_rn"))
814                    limitselect._oracle_visit = True
815                    limitselect._is_wrapper = True
816
817                    offsetselect = sql.select(
818                        [c for c in limitselect.c if c.key != 'ora_rn'])
819                    offsetselect._oracle_visit = True
820                    offsetselect._is_wrapper = True
821
822                    if for_update is not None and for_update.of:
823                        for elem in for_update.of:
824                            if limitselect.corresponding_column(elem) is None:
825                                limitselect.append_column(elem)
826
827                    if not self.dialect.use_binds_for_limits:
828                        offset_clause = sql.literal_column(
829                            "%d" % select._offset)
830                    offsetselect.append_whereclause(
831                        sql.literal_column("ora_rn") > offset_clause)
832
833                    offsetselect._for_update_arg = for_update
834                    select = offsetselect
835
836        return compiler.SQLCompiler.visit_select(self, select, **kwargs)
837
838    def limit_clause(self, select, **kw):
839        return ""
840
841    def for_update_clause(self, select, **kw):
842        if self.is_subquery():
843            return ""
844
845        tmp = ' FOR UPDATE'
846
847        if select._for_update_arg.of:
848            tmp += ' OF ' + ', '.join(
849                self.process(elem, **kw) for elem in
850                select._for_update_arg.of
851            )
852
853        if select._for_update_arg.nowait:
854            tmp += " NOWAIT"
855        if select._for_update_arg.skip_locked:
856            tmp += " SKIP LOCKED"
857
858        return tmp
859
860
861class OracleDDLCompiler(compiler.DDLCompiler):
862
863    def define_constraint_cascades(self, constraint):
864        text = ""
865        if constraint.ondelete is not None:
866            text += " ON DELETE %s" % constraint.ondelete
867
868        # oracle has no ON UPDATE CASCADE -
869        # its only available via triggers
870        # http://asktom.oracle.com/tkyte/update_cascade/index.html
871        if constraint.onupdate is not None:
872            util.warn(
873                "Oracle does not contain native UPDATE CASCADE "
874                "functionality - onupdates will not be rendered for foreign "
875                "keys.  Consider using deferrable=True, initially='deferred' "
876                "or triggers.")
877
878        return text
879
880    def visit_create_index(self, create):
881        index = create.element
882        self._verify_index_table(index)
883        preparer = self.preparer
884        text = "CREATE "
885        if index.unique:
886            text += "UNIQUE "
887        if index.dialect_options['oracle']['bitmap']:
888            text += "BITMAP "
889        text += "INDEX %s ON %s (%s)" % (
890            self._prepared_index_name(index, include_schema=True),
891            preparer.format_table(index.table, use_schema=True),
892            ', '.join(
893                self.sql_compiler.process(
894                    expr,
895                    include_table=False, literal_binds=True)
896                for expr in index.expressions)
897        )
898        if index.dialect_options['oracle']['compress'] is not False:
899            if index.dialect_options['oracle']['compress'] is True:
900                text += " COMPRESS"
901            else:
902                text += " COMPRESS %d" % (
903                    index.dialect_options['oracle']['compress']
904                )
905        return text
906
907    def post_create_table(self, table):
908        table_opts = []
909        opts = table.dialect_options['oracle']
910
911        if opts['on_commit']:
912            on_commit_options = opts['on_commit'].replace("_", " ").upper()
913            table_opts.append('\n ON COMMIT %s' % on_commit_options)
914
915        if opts['compress']:
916            if opts['compress'] is True:
917                table_opts.append("\n COMPRESS")
918            else:
919                table_opts.append("\n COMPRESS FOR %s" % (
920                    opts['compress']
921                ))
922
923        return ''.join(table_opts)
924
925
926class OracleIdentifierPreparer(compiler.IdentifierPreparer):
927
928    reserved_words = set([x.lower() for x in RESERVED_WORDS])
929    illegal_initial_characters = set(
930        (str(dig) for dig in range(0, 10))).union(["_", "$"])
931
932    def _bindparam_requires_quotes(self, value):
933        """Return True if the given identifier requires quoting."""
934        lc_value = value.lower()
935        return (lc_value in self.reserved_words
936                or value[0] in self.illegal_initial_characters
937                or not self.legal_characters.match(util.text_type(value))
938                )
939
940    def format_savepoint(self, savepoint):
941        name = savepoint.ident.lstrip('_')
942        return super(
943            OracleIdentifierPreparer, self).format_savepoint(savepoint, name)
944
945
946class OracleExecutionContext(default.DefaultExecutionContext):
947    def fire_sequence(self, seq, type_):
948        return self._execute_scalar(
949            "SELECT " +
950            self.dialect.identifier_preparer.format_sequence(seq) +
951            ".nextval FROM DUAL", type_)
952
953
954class OracleDialect(default.DefaultDialect):
955    name = 'oracle'
956    supports_alter = True
957    supports_unicode_statements = False
958    supports_unicode_binds = False
959    max_identifier_length = 30
960    supports_sane_rowcount = True
961    supports_sane_multi_rowcount = False
962
963    supports_simple_order_by_label = False
964
965    supports_sequences = True
966    sequences_optional = False
967    postfetch_lastrowid = False
968
969    default_paramstyle = 'named'
970    colspecs = colspecs
971    ischema_names = ischema_names
972    requires_name_normalize = True
973
974    supports_default_values = False
975    supports_empty_insert = False
976
977    statement_compiler = OracleCompiler
978    ddl_compiler = OracleDDLCompiler
979    type_compiler = OracleTypeCompiler
980    preparer = OracleIdentifierPreparer
981    execution_ctx_cls = OracleExecutionContext
982
983    reflection_options = ('oracle_resolve_synonyms', )
984
985    construct_arguments = [
986        (sa_schema.Table, {
987            "resolve_synonyms": False,
988            "on_commit": None,
989            "compress": False
990        }),
991        (sa_schema.Index, {
992            "bitmap": False,
993            "compress": False
994        })
995    ]
996
997    def __init__(self,
998                 use_ansi=True,
999                 optimize_limits=False,
1000                 use_binds_for_limits=True,
1001                 exclude_tablespaces=('SYSTEM', 'SYSAUX', ),
1002                 **kwargs):
1003        default.DefaultDialect.__init__(self, **kwargs)
1004        self.use_ansi = use_ansi
1005        self.optimize_limits = optimize_limits
1006        self.use_binds_for_limits = use_binds_for_limits
1007        self.exclude_tablespaces = exclude_tablespaces
1008
1009    def initialize(self, connection):
1010        super(OracleDialect, self).initialize(connection)
1011        self.implicit_returning = self.__dict__.get(
1012            'implicit_returning',
1013            self.server_version_info > (10, )
1014        )
1015
1016        if self._is_oracle_8:
1017            self.colspecs = self.colspecs.copy()
1018            self.colspecs.pop(sqltypes.Interval)
1019            self.use_ansi = False
1020
1021    @property
1022    def _is_oracle_8(self):
1023        return self.server_version_info and \
1024            self.server_version_info < (9, )
1025
1026    @property
1027    def _supports_table_compression(self):
1028        return self.server_version_info and \
1029            self.server_version_info >= (10, 1, )
1030
1031    @property
1032    def _supports_table_compress_for(self):
1033        return self.server_version_info and \
1034            self.server_version_info >= (11, )
1035
1036    @property
1037    def _supports_char_length(self):
1038        return not self._is_oracle_8
1039
1040    @property
1041    def _supports_nchar(self):
1042        return not self._is_oracle_8
1043
1044    def do_release_savepoint(self, connection, name):
1045        # Oracle does not support RELEASE SAVEPOINT
1046        pass
1047
1048    def has_table(self, connection, table_name, schema=None):
1049        if not schema:
1050            schema = self.default_schema_name
1051        cursor = connection.execute(
1052            sql.text("SELECT table_name FROM all_tables "
1053                     "WHERE table_name = :name AND owner = :schema_name"),
1054            name=self.denormalize_name(table_name),
1055            schema_name=self.denormalize_name(schema))
1056        return cursor.first() is not None
1057
1058    def has_sequence(self, connection, sequence_name, schema=None):
1059        if not schema:
1060            schema = self.default_schema_name
1061        cursor = connection.execute(
1062            sql.text("SELECT sequence_name FROM all_sequences "
1063                     "WHERE sequence_name = :name AND "
1064                     "sequence_owner = :schema_name"),
1065            name=self.denormalize_name(sequence_name),
1066            schema_name=self.denormalize_name(schema))
1067        return cursor.first() is not None
1068
1069    def normalize_name(self, name):
1070        if name is None:
1071            return None
1072        if util.py2k:
1073            if isinstance(name, str):
1074                name = name.decode(self.encoding)
1075        if name.upper() == name and not \
1076                self.identifier_preparer._requires_quotes(name.lower()):
1077            return name.lower()
1078        elif name.lower() == name:
1079            return quoted_name(name, quote=True)
1080        else:
1081            return name
1082
1083    def denormalize_name(self, name):
1084        if name is None:
1085            return None
1086        elif name.lower() == name and not \
1087                self.identifier_preparer._requires_quotes(name.lower()):
1088            name = name.upper()
1089        if util.py2k:
1090            if not self.supports_unicode_binds:
1091                name = name.encode(self.encoding)
1092            else:
1093                name = unicode(name)
1094        return name
1095
1096    def _get_default_schema_name(self, connection):
1097        return self.normalize_name(
1098            connection.execute('SELECT USER FROM DUAL').scalar())
1099
1100    def _resolve_synonym(self, connection, desired_owner=None,
1101                         desired_synonym=None, desired_table=None):
1102        """search for a local synonym matching the given desired owner/name.
1103
1104        if desired_owner is None, attempts to locate a distinct owner.
1105
1106        returns the actual name, owner, dblink name, and synonym name if
1107        found.
1108        """
1109
1110        q = "SELECT owner, table_owner, table_name, db_link, "\
1111            "synonym_name FROM all_synonyms WHERE "
1112        clauses = []
1113        params = {}
1114        if desired_synonym:
1115            clauses.append("synonym_name = :synonym_name")
1116            params['synonym_name'] = desired_synonym
1117        if desired_owner:
1118            clauses.append("owner = :desired_owner")
1119            params['desired_owner'] = desired_owner
1120        if desired_table:
1121            clauses.append("table_name = :tname")
1122            params['tname'] = desired_table
1123
1124        q += " AND ".join(clauses)
1125
1126        result = connection.execute(sql.text(q), **params)
1127        if desired_owner:
1128            row = result.first()
1129            if row:
1130                return (row['table_name'], row['table_owner'],
1131                        row['db_link'], row['synonym_name'])
1132            else:
1133                return None, None, None, None
1134        else:
1135            rows = result.fetchall()
1136            if len(rows) > 1:
1137                raise AssertionError(
1138                    "There are multiple tables visible to the schema, you "
1139                    "must specify owner")
1140            elif len(rows) == 1:
1141                row = rows[0]
1142                return (row['table_name'], row['table_owner'],
1143                        row['db_link'], row['synonym_name'])
1144            else:
1145                return None, None, None, None
1146
1147    @reflection.cache
1148    def _prepare_reflection_args(self, connection, table_name, schema=None,
1149                                 resolve_synonyms=False, dblink='', **kw):
1150
1151        if resolve_synonyms:
1152            actual_name, owner, dblink, synonym = self._resolve_synonym(
1153                connection,
1154                desired_owner=self.denormalize_name(schema),
1155                desired_synonym=self.denormalize_name(table_name)
1156            )
1157        else:
1158            actual_name, owner, dblink, synonym = None, None, None, None
1159        if not actual_name:
1160            actual_name = self.denormalize_name(table_name)
1161
1162        if dblink:
1163            # using user_db_links here since all_db_links appears
1164            # to have more restricted permissions.
1165            # http://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
1166            # will need to hear from more users if we are doing
1167            # the right thing here.  See [ticket:2619]
1168            owner = connection.scalar(
1169                sql.text("SELECT username FROM user_db_links "
1170                         "WHERE db_link=:link"), link=dblink)
1171            dblink = "@" + dblink
1172        elif not owner:
1173            owner = self.denormalize_name(schema or self.default_schema_name)
1174
1175        return (actual_name, owner, dblink or '', synonym)
1176
1177    @reflection.cache
1178    def get_schema_names(self, connection, **kw):
1179        s = "SELECT username FROM all_users ORDER BY username"
1180        cursor = connection.execute(s,)
1181        return [self.normalize_name(row[0]) for row in cursor]
1182
1183    @reflection.cache
1184    def get_table_names(self, connection, schema=None, **kw):
1185        schema = self.denormalize_name(schema or self.default_schema_name)
1186
1187        # note that table_names() isn't loading DBLINKed or synonym'ed tables
1188        if schema is None:
1189            schema = self.default_schema_name
1190
1191        sql_str = "SELECT table_name FROM all_tables WHERE "
1192        if self.exclude_tablespaces:
1193            sql_str += (
1194                "nvl(tablespace_name, 'no tablespace') "
1195                "NOT IN (%s) AND " % (
1196                    ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
1197                )
1198            )
1199        sql_str += (
1200            "OWNER = :owner "
1201            "AND IOT_NAME IS NULL "
1202            "AND DURATION IS NULL")
1203
1204        cursor = connection.execute(sql.text(sql_str), owner=schema)
1205        return [self.normalize_name(row[0]) for row in cursor]
1206
1207    @reflection.cache
1208    def get_temp_table_names(self, connection, **kw):
1209        schema = self.denormalize_name(self.default_schema_name)
1210
1211        sql_str = "SELECT table_name FROM all_tables WHERE "
1212        if self.exclude_tablespaces:
1213            sql_str += (
1214                "nvl(tablespace_name, 'no tablespace') "
1215                "NOT IN (%s) AND " % (
1216                    ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
1217                )
1218            )
1219        sql_str += (
1220            "OWNER = :owner "
1221            "AND IOT_NAME IS NULL "
1222            "AND DURATION IS NOT NULL")
1223
1224        cursor = connection.execute(sql.text(sql_str), owner=schema)
1225        return [self.normalize_name(row[0]) for row in cursor]
1226
1227    @reflection.cache
1228    def get_view_names(self, connection, schema=None, **kw):
1229        schema = self.denormalize_name(schema or self.default_schema_name)
1230        s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner")
1231        cursor = connection.execute(s, owner=self.denormalize_name(schema))
1232        return [self.normalize_name(row[0]) for row in cursor]
1233
1234    @reflection.cache
1235    def get_table_options(self, connection, table_name, schema=None, **kw):
1236        options = {}
1237
1238        resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
1239        dblink = kw.get('dblink', '')
1240        info_cache = kw.get('info_cache')
1241
1242        (table_name, schema, dblink, synonym) = \
1243            self._prepare_reflection_args(connection, table_name, schema,
1244                                          resolve_synonyms, dblink,
1245                                          info_cache=info_cache)
1246
1247        params = {"table_name": table_name}
1248
1249        columns = ["table_name"]
1250        if self._supports_table_compression:
1251            columns.append("compression")
1252        if self._supports_table_compress_for:
1253            columns.append("compress_for")
1254
1255        text = "SELECT %(columns)s "\
1256            "FROM ALL_TABLES%(dblink)s "\
1257            "WHERE table_name = :table_name"
1258
1259        if schema is not None:
1260            params['owner'] = schema
1261            text += " AND owner = :owner "
1262        text = text % {'dblink': dblink, 'columns': ", ".join(columns)}
1263
1264        result = connection.execute(sql.text(text), **params)
1265
1266        enabled = dict(DISABLED=False, ENABLED=True)
1267
1268        row = result.first()
1269        if row:
1270            if "compression" in row and enabled.get(row.compression, False):
1271                if "compress_for" in row:
1272                    options['oracle_compress'] = row.compress_for
1273                else:
1274                    options['oracle_compress'] = True
1275
1276        return options
1277
1278    @reflection.cache
1279    def get_columns(self, connection, table_name, schema=None, **kw):
1280        """
1281
1282        kw arguments can be:
1283
1284            oracle_resolve_synonyms
1285
1286            dblink
1287
1288        """
1289
1290        resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
1291        dblink = kw.get('dblink', '')
1292        info_cache = kw.get('info_cache')
1293
1294        (table_name, schema, dblink, synonym) = \
1295            self._prepare_reflection_args(connection, table_name, schema,
1296                                          resolve_synonyms, dblink,
1297                                          info_cache=info_cache)
1298        columns = []
1299        if self._supports_char_length:
1300            char_length_col = 'char_length'
1301        else:
1302            char_length_col = 'data_length'
1303
1304        params = {"table_name": table_name}
1305        text = "SELECT column_name, data_type, %(char_length_col)s, "\
1306            "data_precision, data_scale, "\
1307            "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "\
1308            "WHERE table_name = :table_name"
1309        if schema is not None:
1310            params['owner'] = schema
1311            text += " AND owner = :owner "
1312        text += " ORDER BY column_id"
1313        text = text % {'dblink': dblink, 'char_length_col': char_length_col}
1314
1315        c = connection.execute(sql.text(text), **params)
1316
1317        for row in c:
1318            (colname, orig_colname, coltype, length, precision, scale, nullable, default) = \
1319                (self.normalize_name(row[0]), row[0], row[1], row[
1320                 2], row[3], row[4], row[5] == 'Y', row[6])
1321
1322            if coltype == 'NUMBER':
1323                coltype = NUMBER(precision, scale)
1324            elif coltype in ('VARCHAR2', 'NVARCHAR2', 'CHAR'):
1325                coltype = self.ischema_names.get(coltype)(length)
1326            elif 'WITH TIME ZONE' in coltype:
1327                coltype = TIMESTAMP(timezone=True)
1328            else:
1329                coltype = re.sub(r'\(\d+\)', '', coltype)
1330                try:
1331                    coltype = self.ischema_names[coltype]
1332                except KeyError:
1333                    util.warn("Did not recognize type '%s' of column '%s'" %
1334                              (coltype, colname))
1335                    coltype = sqltypes.NULLTYPE
1336
1337            cdict = {
1338                'name': colname,
1339                'type': coltype,
1340                'nullable': nullable,
1341                'default': default,
1342                'autoincrement': 'auto',
1343            }
1344            if orig_colname.lower() == orig_colname:
1345                cdict['quote'] = True
1346
1347            columns.append(cdict)
1348        return columns
1349
1350    @reflection.cache
1351    def get_indexes(self, connection, table_name, schema=None,
1352                    resolve_synonyms=False, dblink='', **kw):
1353
1354        info_cache = kw.get('info_cache')
1355        (table_name, schema, dblink, synonym) = \
1356            self._prepare_reflection_args(connection, table_name, schema,
1357                                          resolve_synonyms, dblink,
1358                                          info_cache=info_cache)
1359        indexes = []
1360
1361        params = {'table_name': table_name}
1362        text = \
1363            "SELECT a.index_name, a.column_name, "\
1364            "\nb.index_type, b.uniqueness, b.compression, b.prefix_length "\
1365            "\nFROM ALL_IND_COLUMNS%(dblink)s a, "\
1366            "\nALL_INDEXES%(dblink)s b "\
1367            "\nWHERE "\
1368            "\na.index_name = b.index_name "\
1369            "\nAND a.table_owner = b.table_owner "\
1370            "\nAND a.table_name = b.table_name "\
1371            "\nAND a.table_name = :table_name "
1372
1373        if schema is not None:
1374            params['schema'] = schema
1375            text += "AND a.table_owner = :schema "
1376
1377        text += "ORDER BY a.index_name, a.column_position"
1378
1379        text = text % {'dblink': dblink}
1380
1381        q = sql.text(text)
1382        rp = connection.execute(q, **params)
1383        indexes = []
1384        last_index_name = None
1385        pk_constraint = self.get_pk_constraint(
1386            connection, table_name, schema, resolve_synonyms=resolve_synonyms,
1387            dblink=dblink, info_cache=kw.get('info_cache'))
1388        pkeys = pk_constraint['constrained_columns']
1389        uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
1390        enabled = dict(DISABLED=False, ENABLED=True)
1391
1392        oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
1393
1394        def upper_name_set(names):
1395            return set([i.upper() for i in names])
1396
1397        pk_names = upper_name_set(pkeys)
1398
1399        def remove_if_primary_key(index):
1400            # don't include the primary key index
1401            if index is not None and \
1402               upper_name_set(index['column_names']) == pk_names:
1403                indexes.pop()
1404
1405        index = None
1406        for rset in rp:
1407            if rset.index_name != last_index_name:
1408                remove_if_primary_key(index)
1409                index = dict(name=self.normalize_name(rset.index_name),
1410                             column_names=[], dialect_options={})
1411                indexes.append(index)
1412            index['unique'] = uniqueness.get(rset.uniqueness, False)
1413
1414            if rset.index_type in ('BITMAP', 'FUNCTION-BASED BITMAP'):
1415                index['dialect_options']['oracle_bitmap'] = True
1416            if enabled.get(rset.compression, False):
1417                index['dialect_options']['oracle_compress'] = rset.prefix_length
1418
1419            # filter out Oracle SYS_NC names.  could also do an outer join
1420            # to the all_tab_columns table and check for real col names there.
1421            if not oracle_sys_col.match(rset.column_name):
1422                index['column_names'].append(
1423                    self.normalize_name(rset.column_name))
1424            last_index_name = rset.index_name
1425        remove_if_primary_key(index)
1426        return indexes
1427
1428    @reflection.cache
1429    def _get_constraint_data(self, connection, table_name, schema=None,
1430                             dblink='', **kw):
1431
1432        params = {'table_name': table_name}
1433
1434        text = \
1435            "SELECT"\
1436            "\nac.constraint_name,"\
1437            "\nac.constraint_type,"\
1438            "\nloc.column_name AS local_column,"\
1439            "\nrem.table_name AS remote_table,"\
1440            "\nrem.column_name AS remote_column,"\
1441            "\nrem.owner AS remote_owner,"\
1442            "\nloc.position as loc_pos,"\
1443            "\nrem.position as rem_pos"\
1444            "\nFROM all_constraints%(dblink)s ac,"\
1445            "\nall_cons_columns%(dblink)s loc,"\
1446            "\nall_cons_columns%(dblink)s rem"\
1447            "\nWHERE ac.table_name = :table_name"\
1448            "\nAND ac.constraint_type IN ('R','P')"
1449
1450        if schema is not None:
1451            params['owner'] = schema
1452            text += "\nAND ac.owner = :owner"
1453
1454        text += \
1455            "\nAND ac.owner = loc.owner"\
1456            "\nAND ac.constraint_name = loc.constraint_name"\
1457            "\nAND ac.r_owner = rem.owner(+)"\
1458            "\nAND ac.r_constraint_name = rem.constraint_name(+)"\
1459            "\nAND (rem.position IS NULL or loc.position=rem.position)"\
1460            "\nORDER BY ac.constraint_name, loc.position"
1461
1462        text = text % {'dblink': dblink}
1463        rp = connection.execute(sql.text(text), **params)
1464        constraint_data = rp.fetchall()
1465        return constraint_data
1466
1467    @reflection.cache
1468    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
1469        resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
1470        dblink = kw.get('dblink', '')
1471        info_cache = kw.get('info_cache')
1472
1473        (table_name, schema, dblink, synonym) = \
1474            self._prepare_reflection_args(connection, table_name, schema,
1475                                          resolve_synonyms, dblink,
1476                                          info_cache=info_cache)
1477        pkeys = []
1478        constraint_name = None
1479        constraint_data = self._get_constraint_data(
1480            connection, table_name, schema, dblink,
1481            info_cache=kw.get('info_cache'))
1482
1483        for row in constraint_data:
1484            (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
1485                row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
1486            if cons_type == 'P':
1487                if constraint_name is None:
1488                    constraint_name = self.normalize_name(cons_name)
1489                pkeys.append(local_column)
1490        return {'constrained_columns': pkeys, 'name': constraint_name}
1491
1492    @reflection.cache
1493    def get_foreign_keys(self, connection, table_name, schema=None, **kw):
1494        """
1495
1496        kw arguments can be:
1497
1498            oracle_resolve_synonyms
1499
1500            dblink
1501
1502        """
1503
1504        requested_schema = schema  # to check later on
1505        resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
1506        dblink = kw.get('dblink', '')
1507        info_cache = kw.get('info_cache')
1508
1509        (table_name, schema, dblink, synonym) = \
1510            self._prepare_reflection_args(connection, table_name, schema,
1511                                          resolve_synonyms, dblink,
1512                                          info_cache=info_cache)
1513
1514        constraint_data = self._get_constraint_data(
1515            connection, table_name, schema, dblink,
1516            info_cache=kw.get('info_cache'))
1517
1518        def fkey_rec():
1519            return {
1520                'name': None,
1521                'constrained_columns': [],
1522                'referred_schema': None,
1523                'referred_table': None,
1524                'referred_columns': []
1525            }
1526
1527        fkeys = util.defaultdict(fkey_rec)
1528
1529        for row in constraint_data:
1530            (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
1531                row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
1532
1533            if cons_type == 'R':
1534                if remote_table is None:
1535                    # ticket 363
1536                    util.warn(
1537                        ("Got 'None' querying 'table_name' from "
1538                         "all_cons_columns%(dblink)s - does the user have "
1539                         "proper rights to the table?") % {'dblink': dblink})
1540                    continue
1541
1542                rec = fkeys[cons_name]
1543                rec['name'] = cons_name
1544                local_cols, remote_cols = rec[
1545                    'constrained_columns'], rec['referred_columns']
1546
1547                if not rec['referred_table']:
1548                    if resolve_synonyms:
1549                        ref_remote_name, ref_remote_owner, ref_dblink, ref_synonym = \
1550                            self._resolve_synonym(
1551                                connection,
1552                                desired_owner=self.denormalize_name(
1553                                    remote_owner),
1554                                desired_table=self.denormalize_name(
1555                                    remote_table)
1556                            )
1557                        if ref_synonym:
1558                            remote_table = self.normalize_name(ref_synonym)
1559                            remote_owner = self.normalize_name(
1560                                ref_remote_owner)
1561
1562                    rec['referred_table'] = remote_table
1563
1564                    if requested_schema is not None or \
1565                       self.denormalize_name(remote_owner) != schema:
1566                        rec['referred_schema'] = remote_owner
1567
1568                local_cols.append(local_column)
1569                remote_cols.append(remote_column)
1570
1571        return list(fkeys.values())
1572
1573    @reflection.cache
1574    def get_view_definition(self, connection, view_name, schema=None,
1575                            resolve_synonyms=False, dblink='', **kw):
1576        info_cache = kw.get('info_cache')
1577        (view_name, schema, dblink, synonym) = \
1578            self._prepare_reflection_args(connection, view_name, schema,
1579                                          resolve_synonyms, dblink,
1580                                          info_cache=info_cache)
1581
1582        params = {'view_name': view_name}
1583        text = "SELECT text FROM all_views WHERE view_name=:view_name"
1584
1585        if schema is not None:
1586            text += " AND owner = :schema"
1587            params['schema'] = schema
1588
1589        rp = connection.execute(sql.text(text), **params).scalar()
1590        if rp:
1591            if util.py2k:
1592                rp = rp.decode(self.encoding)
1593            return rp
1594        else:
1595            return None
1596
1597
1598class _OuterJoinColumn(sql.ClauseElement):
1599    __visit_name__ = 'outer_join_column'
1600
1601    def __init__(self, column):
1602        self.column = column
1603