1# mssql/base.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7"""
8.. dialect:: mssql
9    :name: Microsoft SQL Server
10
11
12.. _mssql_external_dialects:
13
14External Dialects
15-----------------
16
17In addition to the above DBAPI layers with native SQLAlchemy support, there
18are third-party dialects for other DBAPI layers that are compatible
19with SQL Server. See the "External Dialects" list on the
20:ref:`dialect_toplevel` page.
21
22.. _mssql_identity:
23
24Auto Increment Behavior / IDENTITY Columns
25------------------------------------------
26
27SQL Server provides so-called "auto incrementing" behavior using the
28``IDENTITY`` construct, which can be placed on any single integer column in a
29table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement"
30behavior for an integer primary key column, described at
31:paramref:`_schema.Column.autoincrement`.  This means that by default,
32the first
33integer primary key column in a :class:`_schema.Table`
34will be considered to be the
35identity column and will generate DDL as such::
36
37    from sqlalchemy import Table, MetaData, Column, Integer
38
39    m = MetaData()
40    t = Table('t', m,
41            Column('id', Integer, primary_key=True),
42            Column('x', Integer))
43    m.create_all(engine)
44
45The above example will generate DDL as:
46
47.. sourcecode:: sql
48
49    CREATE TABLE t (
50        id INTEGER NOT NULL IDENTITY(1,1),
51        x INTEGER NULL,
52        PRIMARY KEY (id)
53    )
54
55For the case where this default generation of ``IDENTITY`` is not desired,
56specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag,
57on the first integer primary key column::
58
59    m = MetaData()
60    t = Table('t', m,
61            Column('id', Integer, primary_key=True, autoincrement=False),
62            Column('x', Integer))
63    m.create_all(engine)
64
65To add the ``IDENTITY`` keyword to a non-primary key column, specify
66``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired
67:class:`_schema.Column` object, and ensure that
68:paramref:`_schema.Column.autoincrement`
69is set to ``False`` on any integer primary key column::
70
71    m = MetaData()
72    t = Table('t', m,
73            Column('id', Integer, primary_key=True, autoincrement=False),
74            Column('x', Integer, autoincrement=True))
75    m.create_all(engine)
76
77.. versionchanged::  1.3   Added ``mssql_identity_start`` and
78   ``mssql_identity_increment`` parameters to :class:`_schema.Column`.
79   These replace
80   the use of the :class:`.Sequence` object in order to specify these values.
81
82.. deprecated:: 1.3
83
84   The use of :class:`.Sequence` to specify IDENTITY characteristics is
85   deprecated and will be removed in a future release.   Please use
86   the ``mssql_identity_start`` and ``mssql_identity_increment`` parameters
87   documented at :ref:`mssql_identity`.
88
89.. note::
90
91    There can only be one IDENTITY column on the table.  When using
92    ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not
93    guard against multiple columns specifying the option simultaneously.  The
94    SQL Server database will instead reject the ``CREATE TABLE`` statement.
95
96.. note::
97
98    An INSERT statement which attempts to provide a value for a column that is
99    marked with IDENTITY will be rejected by SQL Server.   In order for the
100    value to be accepted, a session-level option "SET IDENTITY_INSERT" must be
101    enabled.   The SQLAlchemy SQL Server dialect will perform this operation
102    automatically when using a core :class:`_expression.Insert`
103    construct; if the
104    execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT"
105    option will be enabled for the span of that statement's invocation.However,
106    this scenario is not high performing and should not be relied upon for
107    normal use.   If a table doesn't actually require IDENTITY behavior in its
108    integer primary key column, the keyword should be disabled when creating
109    the table by ensuring that ``autoincrement=False`` is set.
110
111Controlling "Start" and "Increment"
112^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
113
114Specific control over the "start" and "increment" values for
115the ``IDENTITY`` generator are provided using the
116``mssql_identity_start`` and ``mssql_identity_increment`` parameters
117passed to the :class:`_schema.Column` object::
118
119    from sqlalchemy import Table, Integer, Column
120
121    test = Table(
122        'test', metadata,
123        Column(
124            'id', Integer, primary_key=True, mssql_identity_start=100,
125             mssql_identity_increment=10
126        ),
127        Column('name', String(20))
128    )
129
130The CREATE TABLE for the above :class:`_schema.Table` object would be:
131
132.. sourcecode:: sql
133
134   CREATE TABLE test (
135     id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,
136     name VARCHAR(20) NULL,
137     )
138
139.. versionchanged:: 1.3  The ``mssql_identity_start`` and
140   ``mssql_identity_increment`` parameters are now used to affect the
141   ``IDENTITY`` generator for a :class:`_schema.Column` under  SQL Server.
142   Previously, the :class:`.Sequence` object was used.  As SQL Server now
143   supports real sequences as a separate construct, :class:`.Sequence` will be
144   functional in the normal way in a future SQLAlchemy version.
145
146INSERT behavior
147^^^^^^^^^^^^^^^^
148
149Handling of the ``IDENTITY`` column at INSERT time involves two key
150techniques. The most common is being able to fetch the "last inserted value"
151for a given ``IDENTITY`` column, a process which SQLAlchemy performs
152implicitly in many cases, most importantly within the ORM.
153
154The process for fetching this value has several variants:
155
156* In the vast majority of cases, RETURNING is used in conjunction with INSERT
157  statements on SQL Server in order to get newly generated primary key values:
158
159  .. sourcecode:: sql
160
161    INSERT INTO t (x) OUTPUT inserted.id VALUES (?)
162
163* When RETURNING is not available or has been disabled via
164  ``implicit_returning=False``, either the ``scope_identity()`` function or
165  the ``@@identity`` variable is used; behavior varies by backend:
166
167  * when using PyODBC, the phrase ``; select scope_identity()`` will be
168    appended to the end of the INSERT statement; a second result set will be
169    fetched in order to receive the value.  Given a table as::
170
171        t = Table('t', m, Column('id', Integer, primary_key=True),
172                Column('x', Integer),
173                implicit_returning=False)
174
175    an INSERT will look like:
176
177    .. sourcecode:: sql
178
179        INSERT INTO t (x) VALUES (?); select scope_identity()
180
181  * Other dialects such as pymssql will call upon
182    ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT
183    statement. If the flag ``use_scope_identity=False`` is passed to
184    :func:`_sa.create_engine`,
185    the statement ``SELECT @@identity AS lastrowid``
186    is used instead.
187
188A table that contains an ``IDENTITY`` column will prohibit an INSERT statement
189that refers to the identity column explicitly.  The SQLAlchemy dialect will
190detect when an INSERT construct, created using a core
191:func:`_expression.insert`
192construct (not a plain string SQL), refers to the identity column, and
193in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert
194statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the
195execution.  Given this example::
196
197    m = MetaData()
198    t = Table('t', m, Column('id', Integer, primary_key=True),
199                    Column('x', Integer))
200    m.create_all(engine)
201
202    with engine.begin() as conn:
203        conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2})
204
205The above column will be created with IDENTITY, however the INSERT statement
206we emit is specifying explicit values.  In the echo output we can see
207how SQLAlchemy handles this:
208
209.. sourcecode:: sql
210
211    CREATE TABLE t (
212        id INTEGER NOT NULL IDENTITY(1,1),
213        x INTEGER NULL,
214        PRIMARY KEY (id)
215    )
216
217    COMMIT
218    SET IDENTITY_INSERT t ON
219    INSERT INTO t (id, x) VALUES (?, ?)
220    ((1, 1), (2, 2))
221    SET IDENTITY_INSERT t OFF
222    COMMIT
223
224
225
226This
227is an auxiliary use case suitable for testing and bulk insert scenarios.
228
229MAX on VARCHAR / NVARCHAR
230-------------------------
231
232SQL Server supports the special string "MAX" within the
233:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes,
234to indicate "maximum length possible".   The dialect currently handles this as
235a length of "None" in the base type, rather than supplying a
236dialect-specific version of these types, so that a base type
237specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on
238more than one backend without using dialect-specific types.
239
240To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None::
241
242    my_table = Table(
243        'my_table', metadata,
244        Column('my_data', VARCHAR(None)),
245        Column('my_n_data', NVARCHAR(None))
246    )
247
248
249Collation Support
250-----------------
251
252Character collations are supported by the base string types,
253specified by the string argument "collation"::
254
255    from sqlalchemy import VARCHAR
256    Column('login', VARCHAR(32, collation='Latin1_General_CI_AS'))
257
258When such a column is associated with a :class:`_schema.Table`, the
259CREATE TABLE statement for this column will yield::
260
261    login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
262
263LIMIT/OFFSET Support
264--------------------
265
266MSSQL has no support for the LIMIT or OFFSET keywords. LIMIT is
267supported directly through the ``TOP`` Transact SQL keyword::
268
269    select.limit
270
271will yield::
272
273    SELECT TOP n
274
275If using SQL Server 2005 or above, LIMIT with OFFSET
276support is available through the ``ROW_NUMBER OVER`` construct.
277For versions below 2005, LIMIT with OFFSET usage will fail.
278
279.. _mssql_isolation_level:
280
281Transaction Isolation Level
282---------------------------
283
284All SQL Server dialects support setting of transaction isolation level
285both via a dialect-specific parameter
286:paramref:`_sa.create_engine.isolation_level`
287accepted by :func:`_sa.create_engine`,
288as well as the :paramref:`.Connection.execution_options.isolation_level`
289argument as passed to
290:meth:`_engine.Connection.execution_options`.
291This feature works by issuing the
292command ``SET TRANSACTION ISOLATION LEVEL <level>`` for
293each new connection.
294
295To set isolation level using :func:`_sa.create_engine`::
296
297    engine = create_engine(
298        "mssql+pyodbc://scott:tiger@ms_2008",
299        isolation_level="REPEATABLE READ"
300    )
301
302To set using per-connection execution options::
303
304    connection = engine.connect()
305    connection = connection.execution_options(
306        isolation_level="READ COMMITTED"
307    )
308
309Valid values for ``isolation_level`` include:
310
311* ``AUTOCOMMIT`` - pyodbc / pymssql-specific
312* ``READ COMMITTED``
313* ``READ UNCOMMITTED``
314* ``REPEATABLE READ``
315* ``SERIALIZABLE``
316* ``SNAPSHOT`` - specific to SQL Server
317
318.. versionadded:: 1.2 added AUTOCOMMIT isolation level setting
319
320.. seealso::
321
322    :ref:`dbapi_autocommit`
323
324Nullability
325-----------
326MSSQL has support for three levels of column nullability. The default
327nullability allows nulls and is explicit in the CREATE TABLE
328construct::
329
330    name VARCHAR(20) NULL
331
332If ``nullable=None`` is specified then no specification is made. In
333other words the database's configured default is used. This will
334render::
335
336    name VARCHAR(20)
337
338If ``nullable`` is ``True`` or ``False`` then the column will be
339``NULL`` or ``NOT NULL`` respectively.
340
341Date / Time Handling
342--------------------
343DATE and TIME are supported.   Bind parameters are converted
344to datetime.datetime() objects as required by most MSSQL drivers,
345and results are processed from strings if needed.
346The DATE and TIME types are not available for MSSQL 2005 and
347previous - if a server version below 2008 is detected, DDL
348for these types will be issued as DATETIME.
349
350.. _mssql_large_type_deprecation:
351
352Large Text/Binary Type Deprecation
353----------------------------------
354
355Per
356`SQL Server 2012/2014 Documentation <http://technet.microsoft.com/en-us/library/ms187993.aspx>`_,
357the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL
358Server in a future release.   SQLAlchemy normally relates these types to the
359:class:`.UnicodeText`, :class:`_expression.TextClause` and
360:class:`.LargeBinary` datatypes.
361
362In order to accommodate this change, a new flag ``deprecate_large_types``
363is added to the dialect, which will be automatically set based on detection
364of the server version in use, if not otherwise set by the user.  The
365behavior of this flag is as follows:
366
367* When this flag is ``True``, the :class:`.UnicodeText`,
368  :class:`_expression.TextClause` and
369  :class:`.LargeBinary` datatypes, when used to render DDL, will render the
370  types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``,
371  respectively.  This is a new behavior as of the addition of this flag.
372
373* When this flag is ``False``, the :class:`.UnicodeText`,
374  :class:`_expression.TextClause` and
375  :class:`.LargeBinary` datatypes, when used to render DDL, will render the
376  types ``NTEXT``, ``TEXT``, and ``IMAGE``,
377  respectively.  This is the long-standing behavior of these types.
378
379* The flag begins with the value ``None``, before a database connection is
380  established.   If the dialect is used to render DDL without the flag being
381  set, it is interpreted the same as ``False``.
382
383* On first connection, the dialect detects if SQL Server version 2012 or
384  greater is in use; if the flag is still at ``None``, it sets it to ``True``
385  or ``False`` based on whether 2012 or greater is detected.
386
387* The flag can be set to either ``True`` or ``False`` when the dialect
388  is created, typically via :func:`_sa.create_engine`::
389
390        eng = create_engine("mssql+pymssql://user:pass@host/db",
391                        deprecate_large_types=True)
392
393* Complete control over whether the "old" or "new" types are rendered is
394  available in all SQLAlchemy versions by using the UPPERCASE type objects
395  instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`,
396  :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`,
397  :class:`_mssql.IMAGE`
398  will always remain fixed and always output exactly that
399  type.
400
401.. versionadded:: 1.0.0
402
403.. _multipart_schema_names:
404
405Multipart Schema Names
406----------------------
407
408SQL Server schemas sometimes require multiple parts to their "schema"
409qualifier, that is, including the database name and owner name as separate
410tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set
411at once using the :paramref:`_schema.Table.schema` argument of
412:class:`_schema.Table`::
413
414    Table(
415        "some_table", metadata,
416        Column("q", String(50)),
417        schema="mydatabase.dbo"
418    )
419
420When performing operations such as table or component reflection, a schema
421argument that contains a dot will be split into separate
422"database" and "owner"  components in order to correctly query the SQL
423Server information schema tables, as these two values are stored separately.
424Additionally, when rendering the schema name for DDL or SQL, the two
425components will be quoted separately for case sensitive names and other
426special characters.   Given an argument as below::
427
428    Table(
429        "some_table", metadata,
430        Column("q", String(50)),
431        schema="MyDataBase.dbo"
432    )
433
434The above schema would be rendered as ``[MyDataBase].dbo``, and also in
435reflection, would be reflected using "dbo" as the owner and "MyDataBase"
436as the database name.
437
438To control how the schema name is broken into database / owner,
439specify brackets (which in SQL Server are quoting characters) in the name.
440Below, the "owner" will be considered as ``MyDataBase.dbo`` and the
441"database" will be None::
442
443    Table(
444        "some_table", metadata,
445        Column("q", String(50)),
446        schema="[MyDataBase.dbo]"
447    )
448
449To individually specify both database and owner name with special characters
450or embedded dots, use two sets of brackets::
451
452    Table(
453        "some_table", metadata,
454        Column("q", String(50)),
455        schema="[MyDataBase.Period].[MyOwner.Dot]"
456    )
457
458
459.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as
460   identifier delimeters splitting the schema into separate database
461   and owner tokens, to allow dots within either name itself.
462
463.. _legacy_schema_rendering:
464
465Legacy Schema Mode
466------------------
467
468Very old versions of the MSSQL dialect introduced the behavior such that a
469schema-qualified table would be auto-aliased when used in a
470SELECT statement; given a table::
471
472    account_table = Table(
473        'account', metadata,
474        Column('id', Integer, primary_key=True),
475        Column('info', String(100)),
476        schema="customer_schema"
477    )
478
479this legacy mode of rendering would assume that "customer_schema.account"
480would not be accepted by all parts of the SQL statement, as illustrated
481below::
482
483    >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True)
484    >>> print(account_table.select().compile(eng))
485    SELECT account_1.id, account_1.info
486    FROM customer_schema.account AS account_1
487
488This mode of behavior is now off by default, as it appears to have served
489no purpose; however in the case that legacy applications rely upon it,
490it is available using the ``legacy_schema_aliasing`` argument to
491:func:`_sa.create_engine` as illustrated above.
492
493.. versionchanged:: 1.1 the ``legacy_schema_aliasing`` flag introduced
494   in version 1.0.5 to allow disabling of legacy mode for schemas now
495   defaults to False.
496
497
498.. _mssql_indexes:
499
500Clustered Index Support
501-----------------------
502
503The MSSQL dialect supports clustered indexes (and primary keys) via the
504``mssql_clustered`` option.  This option is available to :class:`.Index`,
505:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
506
507To generate a clustered index::
508
509    Index("my_index", table.c.x, mssql_clustered=True)
510
511which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``.
512
513To generate a clustered primary key use::
514
515    Table('my_table', metadata,
516          Column('x', ...),
517          Column('y', ...),
518          PrimaryKeyConstraint("x", "y", mssql_clustered=True))
519
520which will render the table, for example, as::
521
522  CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
523                         PRIMARY KEY CLUSTERED (x, y))
524
525Similarly, we can generate a clustered unique constraint using::
526
527    Table('my_table', metadata,
528          Column('x', ...),
529          Column('y', ...),
530          PrimaryKeyConstraint("x"),
531          UniqueConstraint("y", mssql_clustered=True),
532          )
533
534To explicitly request a non-clustered primary key (for example, when
535a separate clustered index is desired), use::
536
537    Table('my_table', metadata,
538          Column('x', ...),
539          Column('y', ...),
540          PrimaryKeyConstraint("x", "y", mssql_clustered=False))
541
542which will render the table, for example, as::
543
544  CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
545                         PRIMARY KEY NONCLUSTERED (x, y))
546
547.. versionchanged:: 1.1 the ``mssql_clustered`` option now defaults
548   to None, rather than False.  ``mssql_clustered=False`` now explicitly
549   renders the NONCLUSTERED clause, whereas None omits the CLUSTERED
550   clause entirely, allowing SQL Server defaults to take effect.
551
552
553MSSQL-Specific Index Options
554-----------------------------
555
556In addition to clustering, the MSSQL dialect supports other special options
557for :class:`.Index`.
558
559INCLUDE
560^^^^^^^
561
562The ``mssql_include`` option renders INCLUDE(colname) for the given string
563names::
564
565    Index("my_index", table.c.x, mssql_include=['y'])
566
567would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
568
569.. _mssql_index_where:
570
571Filtered Indexes
572^^^^^^^^^^^^^^^^
573
574The ``mssql_where`` option renders WHERE(condition) for the given string
575names::
576
577    Index("my_index", table.c.x, mssql_where=table.c.x > 10)
578
579would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``.
580
581.. versionadded:: 1.3.4
582
583Index ordering
584^^^^^^^^^^^^^^
585
586Index ordering is available via functional expressions, such as::
587
588    Index("my_index", table.c.x.desc())
589
590would render the index as ``CREATE INDEX my_index ON table (x DESC)``
591
592.. seealso::
593
594    :ref:`schema_indexes_functional`
595
596Compatibility Levels
597--------------------
598MSSQL supports the notion of setting compatibility levels at the
599database level. This allows, for instance, to run a database that
600is compatible with SQL2000 while running on a SQL2005 database
601server. ``server_version_info`` will always return the database
602server version information (in this case SQL2005) and not the
603compatibility level information. Because of this, if running under
604a backwards compatibility mode SQLAlchemy may attempt to use T-SQL
605statements that are unable to be parsed by the database server.
606
607Triggers
608--------
609
610SQLAlchemy by default uses OUTPUT INSERTED to get at newly
611generated primary key values via IDENTITY columns or other
612server side defaults.   MS-SQL does not
613allow the usage of OUTPUT INSERTED on tables that have triggers.
614To disable the usage of OUTPUT INSERTED on a per-table basis,
615specify ``implicit_returning=False`` for each :class:`_schema.Table`
616which has triggers::
617
618    Table('mytable', metadata,
619        Column('id', Integer, primary_key=True),
620        # ...,
621        implicit_returning=False
622    )
623
624Declarative form::
625
626    class MyClass(Base):
627        # ...
628        __table_args__ = {'implicit_returning':False}
629
630
631This option can also be specified engine-wide using the
632``implicit_returning=False`` argument on :func:`_sa.create_engine`.
633
634.. _mssql_rowcount_versioning:
635
636Rowcount Support / ORM Versioning
637---------------------------------
638
639The SQL Server drivers may have limited ability to return the number
640of rows updated from an UPDATE or DELETE statement.
641
642As of this writing, the PyODBC driver is not able to return a rowcount when
643OUTPUT INSERTED is used.  This impacts the SQLAlchemy ORM's versioning feature
644in many cases where server-side value generators are in use in that while the
645versioning operations can succeed, the ORM cannot always check that an UPDATE
646or DELETE statement matched the number of rows expected, which is how it
647verifies that the version identifier matched.   When this condition occurs, a
648warning will be emitted but the operation will proceed.
649
650The use of OUTPUT INSERTED can be disabled by setting the
651:paramref:`_schema.Table.implicit_returning` flag to ``False`` on a particular
652:class:`_schema.Table`, which in declarative looks like::
653
654    class MyTable(Base):
655        __tablename__ = 'mytable'
656        id = Column(Integer, primary_key=True)
657        stuff = Column(String(10))
658        timestamp = Column(TIMESTAMP(), default=text('DEFAULT'))
659        __mapper_args__ = {
660            'version_id_col': timestamp,
661            'version_id_generator': False,
662        }
663        __table_args__ = {
664            'implicit_returning': False
665        }
666
667Enabling Snapshot Isolation
668---------------------------
669
670SQL Server has a default transaction
671isolation mode that locks entire tables, and causes even mildly concurrent
672applications to have long held locks and frequent deadlocks.
673Enabling snapshot isolation for the database as a whole is recommended
674for modern levels of concurrency support.  This is accomplished via the
675following ALTER DATABASE commands executed at the SQL prompt::
676
677    ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON
678
679    ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON
680
681Background on SQL Server snapshot isolation is available at
682http://msdn.microsoft.com/en-us/library/ms175095.aspx.
683
684"""  # noqa
685
686import codecs
687import datetime
688import operator
689import re
690
691from . import information_schema as ischema
692from ... import engine
693from ... import exc
694from ... import schema as sa_schema
695from ... import sql
696from ... import types as sqltypes
697from ... import util
698from ...engine import default
699from ...engine import reflection
700from ...sql import compiler
701from ...sql import expression
702from ...sql import func
703from ...sql import quoted_name
704from ...sql import util as sql_util
705from ...types import BIGINT
706from ...types import BINARY
707from ...types import CHAR
708from ...types import DATE
709from ...types import DATETIME
710from ...types import DECIMAL
711from ...types import FLOAT
712from ...types import INTEGER
713from ...types import NCHAR
714from ...types import NUMERIC
715from ...types import NVARCHAR
716from ...types import SMALLINT
717from ...types import TEXT
718from ...types import VARCHAR
719from ...util import update_wrapper
720from ...util.langhelpers import public_factory
721
722
723# http://sqlserverbuilds.blogspot.com/
724MS_2016_VERSION = (13,)
725MS_2014_VERSION = (12,)
726MS_2012_VERSION = (11,)
727MS_2008_VERSION = (10,)
728MS_2005_VERSION = (9,)
729MS_2000_VERSION = (8,)
730
731RESERVED_WORDS = set(
732    [
733        "add",
734        "all",
735        "alter",
736        "and",
737        "any",
738        "as",
739        "asc",
740        "authorization",
741        "backup",
742        "begin",
743        "between",
744        "break",
745        "browse",
746        "bulk",
747        "by",
748        "cascade",
749        "case",
750        "check",
751        "checkpoint",
752        "close",
753        "clustered",
754        "coalesce",
755        "collate",
756        "column",
757        "commit",
758        "compute",
759        "constraint",
760        "contains",
761        "containstable",
762        "continue",
763        "convert",
764        "create",
765        "cross",
766        "current",
767        "current_date",
768        "current_time",
769        "current_timestamp",
770        "current_user",
771        "cursor",
772        "database",
773        "dbcc",
774        "deallocate",
775        "declare",
776        "default",
777        "delete",
778        "deny",
779        "desc",
780        "disk",
781        "distinct",
782        "distributed",
783        "double",
784        "drop",
785        "dump",
786        "else",
787        "end",
788        "errlvl",
789        "escape",
790        "except",
791        "exec",
792        "execute",
793        "exists",
794        "exit",
795        "external",
796        "fetch",
797        "file",
798        "fillfactor",
799        "for",
800        "foreign",
801        "freetext",
802        "freetexttable",
803        "from",
804        "full",
805        "function",
806        "goto",
807        "grant",
808        "group",
809        "having",
810        "holdlock",
811        "identity",
812        "identity_insert",
813        "identitycol",
814        "if",
815        "in",
816        "index",
817        "inner",
818        "insert",
819        "intersect",
820        "into",
821        "is",
822        "join",
823        "key",
824        "kill",
825        "left",
826        "like",
827        "lineno",
828        "load",
829        "merge",
830        "national",
831        "nocheck",
832        "nonclustered",
833        "not",
834        "null",
835        "nullif",
836        "of",
837        "off",
838        "offsets",
839        "on",
840        "open",
841        "opendatasource",
842        "openquery",
843        "openrowset",
844        "openxml",
845        "option",
846        "or",
847        "order",
848        "outer",
849        "over",
850        "percent",
851        "pivot",
852        "plan",
853        "precision",
854        "primary",
855        "print",
856        "proc",
857        "procedure",
858        "public",
859        "raiserror",
860        "read",
861        "readtext",
862        "reconfigure",
863        "references",
864        "replication",
865        "restore",
866        "restrict",
867        "return",
868        "revert",
869        "revoke",
870        "right",
871        "rollback",
872        "rowcount",
873        "rowguidcol",
874        "rule",
875        "save",
876        "schema",
877        "securityaudit",
878        "select",
879        "session_user",
880        "set",
881        "setuser",
882        "shutdown",
883        "some",
884        "statistics",
885        "system_user",
886        "table",
887        "tablesample",
888        "textsize",
889        "then",
890        "to",
891        "top",
892        "tran",
893        "transaction",
894        "trigger",
895        "truncate",
896        "tsequal",
897        "union",
898        "unique",
899        "unpivot",
900        "update",
901        "updatetext",
902        "use",
903        "user",
904        "values",
905        "varying",
906        "view",
907        "waitfor",
908        "when",
909        "where",
910        "while",
911        "with",
912        "writetext",
913    ]
914)
915
916
917class REAL(sqltypes.REAL):
918    __visit_name__ = "REAL"
919
920    def __init__(self, **kw):
921        # REAL is a synonym for FLOAT(24) on SQL server.
922        # it is only accepted as the word "REAL" in DDL, the numeric
923        # precision value is not allowed to be present
924        kw.setdefault("precision", 24)
925        super(REAL, self).__init__(**kw)
926
927
928class TINYINT(sqltypes.Integer):
929    __visit_name__ = "TINYINT"
930
931
932# MSSQL DATE/TIME types have varied behavior, sometimes returning
933# strings.  MSDate/TIME check for everything, and always
934# filter bind parameters into datetime objects (required by pyodbc,
935# not sure about other dialects).
936
937
938class _MSDate(sqltypes.Date):
939    def bind_processor(self, dialect):
940        def process(value):
941            if type(value) == datetime.date:
942                return datetime.datetime(value.year, value.month, value.day)
943            else:
944                return value
945
946        return process
947
948    _reg = re.compile(r"(\d+)-(\d+)-(\d+)")
949
950    def result_processor(self, dialect, coltype):
951        def process(value):
952            if isinstance(value, datetime.datetime):
953                return value.date()
954            elif isinstance(value, util.string_types):
955                m = self._reg.match(value)
956                if not m:
957                    raise ValueError(
958                        "could not parse %r as a date value" % (value,)
959                    )
960                return datetime.date(*[int(x or 0) for x in m.groups()])
961            else:
962                return value
963
964        return process
965
966
967class TIME(sqltypes.TIME):
968    def __init__(self, precision=None, **kwargs):
969        self.precision = precision
970        super(TIME, self).__init__()
971
972    __zero_date = datetime.date(1900, 1, 1)
973
974    def bind_processor(self, dialect):
975        def process(value):
976            if isinstance(value, datetime.datetime):
977                value = datetime.datetime.combine(
978                    self.__zero_date, value.time()
979                )
980            elif isinstance(value, datetime.time):
981                """issue #5339
982                per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
983                pass TIME value as string
984                """  # noqa
985                value = str(value)
986            return value
987
988        return process
989
990    _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?")
991
992    def result_processor(self, dialect, coltype):
993        def process(value):
994            if isinstance(value, datetime.datetime):
995                return value.time()
996            elif isinstance(value, util.string_types):
997                m = self._reg.match(value)
998                if not m:
999                    raise ValueError(
1000                        "could not parse %r as a time value" % (value,)
1001                    )
1002                return datetime.time(*[int(x or 0) for x in m.groups()])
1003            else:
1004                return value
1005
1006        return process
1007
1008
1009_MSTime = TIME
1010
1011
1012class _DateTimeBase(object):
1013    def bind_processor(self, dialect):
1014        def process(value):
1015            if type(value) == datetime.date:
1016                return datetime.datetime(value.year, value.month, value.day)
1017            else:
1018                return value
1019
1020        return process
1021
1022
1023class _MSDateTime(_DateTimeBase, sqltypes.DateTime):
1024    pass
1025
1026
1027class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime):
1028    __visit_name__ = "SMALLDATETIME"
1029
1030
1031class DATETIME2(_DateTimeBase, sqltypes.DateTime):
1032    __visit_name__ = "DATETIME2"
1033
1034    def __init__(self, precision=None, **kw):
1035        super(DATETIME2, self).__init__(**kw)
1036        self.precision = precision
1037
1038
1039# TODO: is this not an Interval ?
1040class DATETIMEOFFSET(sqltypes.TypeEngine):
1041    __visit_name__ = "DATETIMEOFFSET"
1042
1043    def __init__(self, precision=None, **kwargs):
1044        self.precision = precision
1045
1046
1047class _UnicodeLiteral(object):
1048    def literal_processor(self, dialect):
1049        def process(value):
1050
1051            value = value.replace("'", "''")
1052
1053            if dialect.identifier_preparer._double_percents:
1054                value = value.replace("%", "%%")
1055
1056            return "N'%s'" % value
1057
1058        return process
1059
1060
1061class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode):
1062    pass
1063
1064
1065class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText):
1066    pass
1067
1068
1069class TIMESTAMP(sqltypes._Binary):
1070    """Implement the SQL Server TIMESTAMP type.
1071
1072    Note this is **completely different** than the SQL Standard
1073    TIMESTAMP type, which is not supported by SQL Server.  It
1074    is a read-only datatype that does not support INSERT of values.
1075
1076    .. versionadded:: 1.2
1077
1078    .. seealso::
1079
1080        :class:`_mssql.ROWVERSION`
1081
1082    """
1083
1084    __visit_name__ = "TIMESTAMP"
1085
1086    # expected by _Binary to be present
1087    length = None
1088
1089    def __init__(self, convert_int=False):
1090        """Construct a TIMESTAMP or ROWVERSION type.
1091
1092        :param convert_int: if True, binary integer values will
1093         be converted to integers on read.
1094
1095        .. versionadded:: 1.2
1096
1097        """
1098        self.convert_int = convert_int
1099
1100    def result_processor(self, dialect, coltype):
1101        super_ = super(TIMESTAMP, self).result_processor(dialect, coltype)
1102        if self.convert_int:
1103
1104            def process(value):
1105                value = super_(value)
1106                if value is not None:
1107                    # https://stackoverflow.com/a/30403242/34549
1108                    value = int(codecs.encode(value, "hex"), 16)
1109                return value
1110
1111            return process
1112        else:
1113            return super_
1114
1115
1116class ROWVERSION(TIMESTAMP):
1117    """Implement the SQL Server ROWVERSION type.
1118
1119    The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP
1120    datatype, however current SQL Server documentation suggests using
1121    ROWVERSION for new datatypes going forward.
1122
1123    The ROWVERSION datatype does **not** reflect (e.g. introspect) from the
1124    database as itself; the returned datatype will be
1125    :class:`_mssql.TIMESTAMP`.
1126
1127    This is a read-only datatype that does not support INSERT of values.
1128
1129    .. versionadded:: 1.2
1130
1131    .. seealso::
1132
1133        :class:`_mssql.TIMESTAMP`
1134
1135    """
1136
1137    __visit_name__ = "ROWVERSION"
1138
1139
1140class NTEXT(sqltypes.UnicodeText):
1141
1142    """MSSQL NTEXT type, for variable-length unicode text up to 2^30
1143    characters."""
1144
1145    __visit_name__ = "NTEXT"
1146
1147
1148class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary):
1149    """The MSSQL VARBINARY type.
1150
1151    This type is present to support "deprecate_large_types" mode where
1152    either ``VARBINARY(max)`` or IMAGE is rendered.   Otherwise, this type
1153    object is redundant vs. :class:`_types.VARBINARY`.
1154
1155    .. versionadded:: 1.0.0
1156
1157    .. seealso::
1158
1159        :ref:`mssql_large_type_deprecation`
1160
1161
1162
1163    """
1164
1165    __visit_name__ = "VARBINARY"
1166
1167
1168class IMAGE(sqltypes.LargeBinary):
1169    __visit_name__ = "IMAGE"
1170
1171
1172class XML(sqltypes.Text):
1173    """MSSQL XML type.
1174
1175    This is a placeholder type for reflection purposes that does not include
1176    any Python-side datatype support.   It also does not currently support
1177    additional arguments, such as "CONTENT", "DOCUMENT",
1178    "xml_schema_collection".
1179
1180    .. versionadded:: 1.1.11
1181
1182    """
1183
1184    __visit_name__ = "XML"
1185
1186
1187class BIT(sqltypes.TypeEngine):
1188    __visit_name__ = "BIT"
1189
1190
1191class MONEY(sqltypes.TypeEngine):
1192    __visit_name__ = "MONEY"
1193
1194
1195class SMALLMONEY(sqltypes.TypeEngine):
1196    __visit_name__ = "SMALLMONEY"
1197
1198
1199class UNIQUEIDENTIFIER(sqltypes.TypeEngine):
1200    __visit_name__ = "UNIQUEIDENTIFIER"
1201
1202
1203class SQL_VARIANT(sqltypes.TypeEngine):
1204    __visit_name__ = "SQL_VARIANT"
1205
1206
1207class TryCast(sql.elements.Cast):
1208    """Represent a SQL Server TRY_CAST expression."""
1209
1210    __visit_name__ = "try_cast"
1211
1212    def __init__(self, *arg, **kw):
1213        """Create a TRY_CAST expression.
1214
1215        :class:`.TryCast` is a subclass of SQLAlchemy's :class:`.Cast`
1216        construct, and works in the same way, except that the SQL expression
1217        rendered is "TRY_CAST" rather than "CAST"::
1218
1219            from sqlalchemy import select
1220            from sqlalchemy import Numeric
1221            from sqlalchemy.dialects.mssql import try_cast
1222
1223            stmt = select([
1224                try_cast(product_table.c.unit_price, Numeric(10, 4))
1225            ])
1226
1227        The above would render::
1228
1229            SELECT TRY_CAST (product_table.unit_price AS NUMERIC(10, 4))
1230            FROM product_table
1231
1232        .. versionadded:: 1.3.7
1233
1234        """
1235        super(TryCast, self).__init__(*arg, **kw)
1236
1237
1238try_cast = public_factory(TryCast, ".dialects.mssql.try_cast")
1239
1240# old names.
1241MSDateTime = _MSDateTime
1242MSDate = _MSDate
1243MSReal = REAL
1244MSTinyInteger = TINYINT
1245MSTime = TIME
1246MSSmallDateTime = SMALLDATETIME
1247MSDateTime2 = DATETIME2
1248MSDateTimeOffset = DATETIMEOFFSET
1249MSText = TEXT
1250MSNText = NTEXT
1251MSString = VARCHAR
1252MSNVarchar = NVARCHAR
1253MSChar = CHAR
1254MSNChar = NCHAR
1255MSBinary = BINARY
1256MSVarBinary = VARBINARY
1257MSImage = IMAGE
1258MSBit = BIT
1259MSMoney = MONEY
1260MSSmallMoney = SMALLMONEY
1261MSUniqueIdentifier = UNIQUEIDENTIFIER
1262MSVariant = SQL_VARIANT
1263
1264ischema_names = {
1265    "int": INTEGER,
1266    "bigint": BIGINT,
1267    "smallint": SMALLINT,
1268    "tinyint": TINYINT,
1269    "varchar": VARCHAR,
1270    "nvarchar": NVARCHAR,
1271    "char": CHAR,
1272    "nchar": NCHAR,
1273    "text": TEXT,
1274    "ntext": NTEXT,
1275    "decimal": DECIMAL,
1276    "numeric": NUMERIC,
1277    "float": FLOAT,
1278    "datetime": DATETIME,
1279    "datetime2": DATETIME2,
1280    "datetimeoffset": DATETIMEOFFSET,
1281    "date": DATE,
1282    "time": TIME,
1283    "smalldatetime": SMALLDATETIME,
1284    "binary": BINARY,
1285    "varbinary": VARBINARY,
1286    "bit": BIT,
1287    "real": REAL,
1288    "image": IMAGE,
1289    "xml": XML,
1290    "timestamp": TIMESTAMP,
1291    "money": MONEY,
1292    "smallmoney": SMALLMONEY,
1293    "uniqueidentifier": UNIQUEIDENTIFIER,
1294    "sql_variant": SQL_VARIANT,
1295}
1296
1297
1298class MSTypeCompiler(compiler.GenericTypeCompiler):
1299    def _extend(self, spec, type_, length=None):
1300        """Extend a string-type declaration with standard SQL
1301        COLLATE annotations.
1302
1303        """
1304
1305        if getattr(type_, "collation", None):
1306            collation = "COLLATE %s" % type_.collation
1307        else:
1308            collation = None
1309
1310        if not length:
1311            length = type_.length
1312
1313        if length:
1314            spec = spec + "(%s)" % length
1315
1316        return " ".join([c for c in (spec, collation) if c is not None])
1317
1318    def visit_FLOAT(self, type_, **kw):
1319        precision = getattr(type_, "precision", None)
1320        if precision is None:
1321            return "FLOAT"
1322        else:
1323            return "FLOAT(%(precision)s)" % {"precision": precision}
1324
1325    def visit_TINYINT(self, type_, **kw):
1326        return "TINYINT"
1327
1328    def visit_DATETIMEOFFSET(self, type_, **kw):
1329        if type_.precision is not None:
1330            return "DATETIMEOFFSET(%s)" % type_.precision
1331        else:
1332            return "DATETIMEOFFSET"
1333
1334    def visit_TIME(self, type_, **kw):
1335        precision = getattr(type_, "precision", None)
1336        if precision is not None:
1337            return "TIME(%s)" % precision
1338        else:
1339            return "TIME"
1340
1341    def visit_TIMESTAMP(self, type_, **kw):
1342        return "TIMESTAMP"
1343
1344    def visit_ROWVERSION(self, type_, **kw):
1345        return "ROWVERSION"
1346
1347    def visit_DATETIME2(self, type_, **kw):
1348        precision = getattr(type_, "precision", None)
1349        if precision is not None:
1350            return "DATETIME2(%s)" % precision
1351        else:
1352            return "DATETIME2"
1353
1354    def visit_SMALLDATETIME(self, type_, **kw):
1355        return "SMALLDATETIME"
1356
1357    def visit_unicode(self, type_, **kw):
1358        return self.visit_NVARCHAR(type_, **kw)
1359
1360    def visit_text(self, type_, **kw):
1361        if self.dialect.deprecate_large_types:
1362            return self.visit_VARCHAR(type_, **kw)
1363        else:
1364            return self.visit_TEXT(type_, **kw)
1365
1366    def visit_unicode_text(self, type_, **kw):
1367        if self.dialect.deprecate_large_types:
1368            return self.visit_NVARCHAR(type_, **kw)
1369        else:
1370            return self.visit_NTEXT(type_, **kw)
1371
1372    def visit_NTEXT(self, type_, **kw):
1373        return self._extend("NTEXT", type_)
1374
1375    def visit_TEXT(self, type_, **kw):
1376        return self._extend("TEXT", type_)
1377
1378    def visit_VARCHAR(self, type_, **kw):
1379        return self._extend("VARCHAR", type_, length=type_.length or "max")
1380
1381    def visit_CHAR(self, type_, **kw):
1382        return self._extend("CHAR", type_)
1383
1384    def visit_NCHAR(self, type_, **kw):
1385        return self._extend("NCHAR", type_)
1386
1387    def visit_NVARCHAR(self, type_, **kw):
1388        return self._extend("NVARCHAR", type_, length=type_.length or "max")
1389
1390    def visit_date(self, type_, **kw):
1391        if self.dialect.server_version_info < MS_2008_VERSION:
1392            return self.visit_DATETIME(type_, **kw)
1393        else:
1394            return self.visit_DATE(type_, **kw)
1395
1396    def visit_time(self, type_, **kw):
1397        if self.dialect.server_version_info < MS_2008_VERSION:
1398            return self.visit_DATETIME(type_, **kw)
1399        else:
1400            return self.visit_TIME(type_, **kw)
1401
1402    def visit_large_binary(self, type_, **kw):
1403        if self.dialect.deprecate_large_types:
1404            return self.visit_VARBINARY(type_, **kw)
1405        else:
1406            return self.visit_IMAGE(type_, **kw)
1407
1408    def visit_IMAGE(self, type_, **kw):
1409        return "IMAGE"
1410
1411    def visit_XML(self, type_, **kw):
1412        return "XML"
1413
1414    def visit_VARBINARY(self, type_, **kw):
1415        return self._extend("VARBINARY", type_, length=type_.length or "max")
1416
1417    def visit_boolean(self, type_, **kw):
1418        return self.visit_BIT(type_)
1419
1420    def visit_BIT(self, type_, **kw):
1421        return "BIT"
1422
1423    def visit_MONEY(self, type_, **kw):
1424        return "MONEY"
1425
1426    def visit_SMALLMONEY(self, type_, **kw):
1427        return "SMALLMONEY"
1428
1429    def visit_UNIQUEIDENTIFIER(self, type_, **kw):
1430        return "UNIQUEIDENTIFIER"
1431
1432    def visit_SQL_VARIANT(self, type_, **kw):
1433        return "SQL_VARIANT"
1434
1435
1436class MSExecutionContext(default.DefaultExecutionContext):
1437    _enable_identity_insert = False
1438    _select_lastrowid = False
1439    _result_proxy = None
1440    _lastrowid = None
1441
1442    def _opt_encode(self, statement):
1443        if not self.dialect.supports_unicode_statements:
1444            return self.dialect._encoder(statement)[0]
1445        else:
1446            return statement
1447
1448    def pre_exec(self):
1449        """Activate IDENTITY_INSERT if needed."""
1450
1451        if self.isinsert:
1452            tbl = self.compiled.statement.table
1453            seq_column = tbl._autoincrement_column
1454            insert_has_sequence = seq_column is not None
1455
1456            if insert_has_sequence:
1457                self._enable_identity_insert = (
1458                    seq_column.key in self.compiled_parameters[0]
1459                ) or (
1460                    self.compiled.statement.parameters
1461                    and (
1462                        (
1463                            self.compiled.statement._has_multi_parameters
1464                            and (
1465                                seq_column.key
1466                                in self.compiled.statement.parameters[0]
1467                                or seq_column
1468                                in self.compiled.statement.parameters[0]
1469                            )
1470                        )
1471                        or (
1472                            not self.compiled.statement._has_multi_parameters
1473                            and (
1474                                seq_column.key
1475                                in self.compiled.statement.parameters
1476                                or seq_column
1477                                in self.compiled.statement.parameters
1478                            )
1479                        )
1480                    )
1481                )
1482            else:
1483                self._enable_identity_insert = False
1484
1485            self._select_lastrowid = (
1486                not self.compiled.inline
1487                and insert_has_sequence
1488                and not self.compiled.returning
1489                and not self._enable_identity_insert
1490                and not self.executemany
1491            )
1492
1493            if self._enable_identity_insert:
1494                self.root_connection._cursor_execute(
1495                    self.cursor,
1496                    self._opt_encode(
1497                        "SET IDENTITY_INSERT %s ON"
1498                        % self.identifier_preparer.format_table(tbl)
1499                    ),
1500                    (),
1501                    self,
1502                )
1503
1504    def post_exec(self):
1505        """Disable IDENTITY_INSERT if enabled."""
1506
1507        conn = self.root_connection
1508        if self._select_lastrowid:
1509            if self.dialect.use_scope_identity:
1510                conn._cursor_execute(
1511                    self.cursor,
1512                    "SELECT scope_identity() AS lastrowid",
1513                    (),
1514                    self,
1515                )
1516            else:
1517                conn._cursor_execute(
1518                    self.cursor, "SELECT @@identity AS lastrowid", (), self
1519                )
1520            # fetchall() ensures the cursor is consumed without closing it
1521            row = self.cursor.fetchall()[0]
1522            self._lastrowid = int(row[0])
1523
1524        if (
1525            self.isinsert or self.isupdate or self.isdelete
1526        ) and self.compiled.returning:
1527            self._result_proxy = engine.FullyBufferedResultProxy(self)
1528
1529        if self._enable_identity_insert:
1530            conn._cursor_execute(
1531                self.cursor,
1532                self._opt_encode(
1533                    "SET IDENTITY_INSERT %s OFF"
1534                    % self.identifier_preparer.format_table(
1535                        self.compiled.statement.table
1536                    )
1537                ),
1538                (),
1539                self,
1540            )
1541
1542    def get_lastrowid(self):
1543        return self._lastrowid
1544
1545    def handle_dbapi_exception(self, e):
1546        if self._enable_identity_insert:
1547            try:
1548                self.cursor.execute(
1549                    self._opt_encode(
1550                        "SET IDENTITY_INSERT %s OFF"
1551                        % self.identifier_preparer.format_table(
1552                            self.compiled.statement.table
1553                        )
1554                    )
1555                )
1556            except Exception:
1557                pass
1558
1559    def get_result_proxy(self):
1560        if self._result_proxy:
1561            return self._result_proxy
1562        else:
1563            return engine.ResultProxy(self)
1564
1565
1566class MSSQLCompiler(compiler.SQLCompiler):
1567    returning_precedes_values = True
1568
1569    extract_map = util.update_copy(
1570        compiler.SQLCompiler.extract_map,
1571        {
1572            "doy": "dayofyear",
1573            "dow": "weekday",
1574            "milliseconds": "millisecond",
1575            "microseconds": "microsecond",
1576        },
1577    )
1578
1579    def __init__(self, *args, **kwargs):
1580        self.tablealiases = {}
1581        super(MSSQLCompiler, self).__init__(*args, **kwargs)
1582
1583    def _with_legacy_schema_aliasing(fn):
1584        def decorate(self, *arg, **kw):
1585            if self.dialect.legacy_schema_aliasing:
1586                return fn(self, *arg, **kw)
1587            else:
1588                super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
1589                return super_(*arg, **kw)
1590
1591        return decorate
1592
1593    def visit_now_func(self, fn, **kw):
1594        return "CURRENT_TIMESTAMP"
1595
1596    def visit_current_date_func(self, fn, **kw):
1597        return "GETDATE()"
1598
1599    def visit_length_func(self, fn, **kw):
1600        return "LEN%s" % self.function_argspec(fn, **kw)
1601
1602    def visit_char_length_func(self, fn, **kw):
1603        return "LEN%s" % self.function_argspec(fn, **kw)
1604
1605    def visit_concat_op_binary(self, binary, operator, **kw):
1606        return "%s + %s" % (
1607            self.process(binary.left, **kw),
1608            self.process(binary.right, **kw),
1609        )
1610
1611    def visit_true(self, expr, **kw):
1612        return "1"
1613
1614    def visit_false(self, expr, **kw):
1615        return "0"
1616
1617    def visit_match_op_binary(self, binary, operator, **kw):
1618        return "CONTAINS (%s, %s)" % (
1619            self.process(binary.left, **kw),
1620            self.process(binary.right, **kw),
1621        )
1622
1623    def get_select_precolumns(self, select, **kw):
1624        """ MS-SQL puts TOP, it's version of LIMIT here """
1625
1626        s = ""
1627        if select._distinct:
1628            s += "DISTINCT "
1629
1630        if select._simple_int_limit and (
1631            select._offset_clause is None
1632            or (select._simple_int_offset and select._offset == 0)
1633        ):
1634            # ODBC drivers and possibly others
1635            # don't support bind params in the SELECT clause on SQL Server.
1636            # so have to use literal here.
1637            s += "TOP %d " % select._limit
1638
1639        if s:
1640            return s
1641        else:
1642            return compiler.SQLCompiler.get_select_precolumns(
1643                self, select, **kw
1644            )
1645
1646    def get_from_hint_text(self, table, text):
1647        return text
1648
1649    def get_crud_hint_text(self, table, text):
1650        return text
1651
1652    def limit_clause(self, select, **kw):
1653        # Limit in mssql is after the select keyword
1654        return ""
1655
1656    def visit_try_cast(self, element, **kw):
1657        return "TRY_CAST (%s AS %s)" % (
1658            self.process(element.clause, **kw),
1659            self.process(element.typeclause, **kw),
1660        )
1661
1662    def visit_select(self, select, **kwargs):
1663        """Look for ``LIMIT`` and OFFSET in a select statement, and if
1664        so tries to wrap it in a subquery with ``row_number()`` criterion.
1665
1666        """
1667        if (
1668            (not select._simple_int_limit and select._limit_clause is not None)
1669            or (
1670                select._offset_clause is not None
1671                and not select._simple_int_offset
1672                or select._offset
1673            )
1674        ) and not getattr(select, "_mssql_visit", None):
1675
1676            # to use ROW_NUMBER(), an ORDER BY is required.
1677            if not select._order_by_clause.clauses:
1678                raise exc.CompileError(
1679                    "MSSQL requires an order_by when "
1680                    "using an OFFSET or a non-simple "
1681                    "LIMIT clause"
1682                )
1683
1684            _order_by_clauses = [
1685                sql_util.unwrap_label_reference(elem)
1686                for elem in select._order_by_clause.clauses
1687            ]
1688
1689            limit_clause = select._limit_clause
1690            offset_clause = select._offset_clause
1691            kwargs["select_wraps_for"] = select
1692            select = select._generate()
1693            select._mssql_visit = True
1694            select = (
1695                select.column(
1696                    sql.func.ROW_NUMBER()
1697                    .over(order_by=_order_by_clauses)
1698                    .label("mssql_rn")
1699                )
1700                .order_by(None)
1701                .alias()
1702            )
1703
1704            mssql_rn = sql.column("mssql_rn")
1705            limitselect = sql.select(
1706                [c for c in select.c if c.key != "mssql_rn"]
1707            )
1708            if offset_clause is not None:
1709                limitselect.append_whereclause(mssql_rn > offset_clause)
1710                if limit_clause is not None:
1711                    limitselect.append_whereclause(
1712                        mssql_rn <= (limit_clause + offset_clause)
1713                    )
1714            else:
1715                limitselect.append_whereclause(mssql_rn <= (limit_clause))
1716            return self.process(limitselect, **kwargs)
1717        else:
1718            return compiler.SQLCompiler.visit_select(self, select, **kwargs)
1719
1720    @_with_legacy_schema_aliasing
1721    def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
1722        if mssql_aliased is table or iscrud:
1723            return super(MSSQLCompiler, self).visit_table(table, **kwargs)
1724
1725        # alias schema-qualified tables
1726        alias = self._schema_aliased_table(table)
1727        if alias is not None:
1728            return self.process(alias, mssql_aliased=table, **kwargs)
1729        else:
1730            return super(MSSQLCompiler, self).visit_table(table, **kwargs)
1731
1732    @_with_legacy_schema_aliasing
1733    def visit_alias(self, alias, **kw):
1734        # translate for schema-qualified table aliases
1735        kw["mssql_aliased"] = alias.original
1736        return super(MSSQLCompiler, self).visit_alias(alias, **kw)
1737
1738    @_with_legacy_schema_aliasing
1739    def visit_column(self, column, add_to_result_map=None, **kw):
1740        if (
1741            column.table is not None
1742            and (not self.isupdate and not self.isdelete)
1743            or self.is_subquery()
1744        ):
1745            # translate for schema-qualified table aliases
1746            t = self._schema_aliased_table(column.table)
1747            if t is not None:
1748                converted = expression._corresponding_column_or_error(
1749                    t, column
1750                )
1751                if add_to_result_map is not None:
1752                    add_to_result_map(
1753                        column.name,
1754                        column.name,
1755                        (column, column.name, column.key),
1756                        column.type,
1757                    )
1758
1759                return super(MSSQLCompiler, self).visit_column(converted, **kw)
1760
1761        return super(MSSQLCompiler, self).visit_column(
1762            column, add_to_result_map=add_to_result_map, **kw
1763        )
1764
1765    def _schema_aliased_table(self, table):
1766        if getattr(table, "schema", None) is not None:
1767            if table not in self.tablealiases:
1768                self.tablealiases[table] = table.alias()
1769            return self.tablealiases[table]
1770        else:
1771            return None
1772
1773    def visit_extract(self, extract, **kw):
1774        field = self.extract_map.get(extract.field, extract.field)
1775        return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw))
1776
1777    def visit_savepoint(self, savepoint_stmt):
1778        return "SAVE TRANSACTION %s" % self.preparer.format_savepoint(
1779            savepoint_stmt
1780        )
1781
1782    def visit_rollback_to_savepoint(self, savepoint_stmt):
1783        return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(
1784            savepoint_stmt
1785        )
1786
1787    def visit_binary(self, binary, **kwargs):
1788        """Move bind parameters to the right-hand side of an operator, where
1789        possible.
1790
1791        """
1792        if (
1793            isinstance(binary.left, expression.BindParameter)
1794            and binary.operator == operator.eq
1795            and not isinstance(binary.right, expression.BindParameter)
1796        ):
1797            return self.process(
1798                expression.BinaryExpression(
1799                    binary.right, binary.left, binary.operator
1800                ),
1801                **kwargs
1802            )
1803        return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
1804
1805    def returning_clause(self, stmt, returning_cols):
1806
1807        if self.isinsert or self.isupdate:
1808            target = stmt.table.alias("inserted")
1809        else:
1810            target = stmt.table.alias("deleted")
1811
1812        adapter = sql_util.ClauseAdapter(target)
1813
1814        columns = [
1815            self._label_select_column(
1816                None, adapter.traverse(c), True, False, {}
1817            )
1818            for c in expression._select_iterables(returning_cols)
1819        ]
1820
1821        return "OUTPUT " + ", ".join(columns)
1822
1823    def get_cte_preamble(self, recursive):
1824        # SQL Server finds it too inconvenient to accept
1825        # an entirely optional, SQL standard specified,
1826        # "RECURSIVE" word with their "WITH",
1827        # so here we go
1828        return "WITH"
1829
1830    def label_select_column(self, select, column, asfrom):
1831        if isinstance(column, expression.Function):
1832            return column.label(None)
1833        else:
1834            return super(MSSQLCompiler, self).label_select_column(
1835                select, column, asfrom
1836            )
1837
1838    def for_update_clause(self, select):
1839        # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which
1840        # SQLAlchemy doesn't use
1841        return ""
1842
1843    def order_by_clause(self, select, **kw):
1844        # MSSQL only allows ORDER BY in subqueries if there is a LIMIT
1845        if self.is_subquery() and not select._limit:
1846            # avoid processing the order by clause if we won't end up
1847            # using it, because we don't want all the bind params tacked
1848            # onto the positional list if that is what the dbapi requires
1849            return ""
1850
1851        order_by = self.process(select._order_by_clause, **kw)
1852
1853        if order_by:
1854            return " ORDER BY " + order_by
1855        else:
1856            return ""
1857
1858    def update_from_clause(
1859        self, update_stmt, from_table, extra_froms, from_hints, **kw
1860    ):
1861        """Render the UPDATE..FROM clause specific to MSSQL.
1862
1863        In MSSQL, if the UPDATE statement involves an alias of the table to
1864        be updated, then the table itself must be added to the FROM list as
1865        well. Otherwise, it is optional. Here, we add it regardless.
1866
1867        """
1868        return "FROM " + ", ".join(
1869            t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
1870            for t in [from_table] + extra_froms
1871        )
1872
1873    def delete_table_clause(self, delete_stmt, from_table, extra_froms):
1874        """If we have extra froms make sure we render any alias as hint."""
1875        ashint = False
1876        if extra_froms:
1877            ashint = True
1878        return from_table._compiler_dispatch(
1879            self, asfrom=True, iscrud=True, ashint=ashint
1880        )
1881
1882    def delete_extra_from_clause(
1883        self, delete_stmt, from_table, extra_froms, from_hints, **kw
1884    ):
1885        """Render the DELETE .. FROM clause specific to MSSQL.
1886
1887        Yes, it has the FROM keyword twice.
1888
1889        """
1890        return "FROM " + ", ".join(
1891            t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
1892            for t in [from_table] + extra_froms
1893        )
1894
1895    def visit_empty_set_expr(self, type_):
1896        return "SELECT 1 WHERE 1!=1"
1897
1898    def visit_is_distinct_from_binary(self, binary, operator, **kw):
1899        return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % (
1900            self.process(binary.left),
1901            self.process(binary.right),
1902        )
1903
1904    def visit_isnot_distinct_from_binary(self, binary, operator, **kw):
1905        return "EXISTS (SELECT %s INTERSECT SELECT %s)" % (
1906            self.process(binary.left),
1907            self.process(binary.right),
1908        )
1909
1910
1911class MSSQLStrictCompiler(MSSQLCompiler):
1912
1913    """A subclass of MSSQLCompiler which disables the usage of bind
1914    parameters where not allowed natively by MS-SQL.
1915
1916    A dialect may use this compiler on a platform where native
1917    binds are used.
1918
1919    """
1920
1921    ansi_bind_rules = True
1922
1923    def visit_in_op_binary(self, binary, operator, **kw):
1924        kw["literal_binds"] = True
1925        return "%s IN %s" % (
1926            self.process(binary.left, **kw),
1927            self.process(binary.right, **kw),
1928        )
1929
1930    def visit_notin_op_binary(self, binary, operator, **kw):
1931        kw["literal_binds"] = True
1932        return "%s NOT IN %s" % (
1933            self.process(binary.left, **kw),
1934            self.process(binary.right, **kw),
1935        )
1936
1937    def render_literal_value(self, value, type_):
1938        """
1939        For date and datetime values, convert to a string
1940        format acceptable to MSSQL. That seems to be the
1941        so-called ODBC canonical date format which looks
1942        like this:
1943
1944            yyyy-mm-dd hh:mi:ss.mmm(24h)
1945
1946        For other data types, call the base class implementation.
1947        """
1948        # datetime and date are both subclasses of datetime.date
1949        if issubclass(type(value), datetime.date):
1950            # SQL Server wants single quotes around the date string.
1951            return "'" + str(value) + "'"
1952        else:
1953            return super(MSSQLStrictCompiler, self).render_literal_value(
1954                value, type_
1955            )
1956
1957
1958class MSDDLCompiler(compiler.DDLCompiler):
1959    def get_column_specification(self, column, **kwargs):
1960        colspec = self.preparer.format_column(column)
1961
1962        # type is not accepted in a computed column
1963        if column.computed is not None:
1964            colspec += " " + self.process(column.computed)
1965        else:
1966            colspec += " " + self.dialect.type_compiler.process(
1967                column.type, type_expression=column
1968            )
1969
1970        if column.nullable is not None:
1971            if (
1972                not column.nullable
1973                or column.primary_key
1974                or isinstance(column.default, sa_schema.Sequence)
1975                or column.autoincrement is True
1976            ):
1977                colspec += " NOT NULL"
1978            elif column.computed is None:
1979                # don't specify "NULL" for computed columns
1980                colspec += " NULL"
1981
1982        if column.table is None:
1983            raise exc.CompileError(
1984                "mssql requires Table-bound columns "
1985                "in order to generate DDL"
1986            )
1987
1988        # install an IDENTITY Sequence if we either a sequence or an implicit
1989        # IDENTITY column
1990        if isinstance(column.default, sa_schema.Sequence):
1991
1992            if (
1993                column.default.start is not None
1994                or column.default.increment is not None
1995                or column is not column.table._autoincrement_column
1996            ):
1997                util.warn_deprecated(
1998                    "Use of Sequence with SQL Server in order to affect the "
1999                    "parameters of the IDENTITY value is deprecated, as "
2000                    "Sequence "
2001                    "will correspond to an actual SQL Server "
2002                    "CREATE SEQUENCE in "
2003                    "a future release.  Please use the mssql_identity_start "
2004                    "and mssql_identity_increment parameters."
2005                )
2006            if column.default.start == 0:
2007                start = 0
2008            else:
2009                start = column.default.start or 1
2010
2011            colspec += " IDENTITY(%s,%s)" % (
2012                start,
2013                column.default.increment or 1,
2014            )
2015        elif (
2016            column is column.table._autoincrement_column
2017            or column.autoincrement is True
2018        ):
2019            start = column.dialect_options["mssql"]["identity_start"]
2020            increment = column.dialect_options["mssql"]["identity_increment"]
2021            colspec += " IDENTITY(%s,%s)" % (start, increment)
2022        else:
2023            default = self.get_column_default_string(column)
2024            if default is not None:
2025                colspec += " DEFAULT " + default
2026
2027        return colspec
2028
2029    def visit_create_index(self, create, include_schema=False):
2030        index = create.element
2031        self._verify_index_table(index)
2032        preparer = self.preparer
2033        text = "CREATE "
2034        if index.unique:
2035            text += "UNIQUE "
2036
2037        # handle clustering option
2038        clustered = index.dialect_options["mssql"]["clustered"]
2039        if clustered is not None:
2040            if clustered:
2041                text += "CLUSTERED "
2042            else:
2043                text += "NONCLUSTERED "
2044
2045        text += "INDEX %s ON %s (%s)" % (
2046            self._prepared_index_name(index, include_schema=include_schema),
2047            preparer.format_table(index.table),
2048            ", ".join(
2049                self.sql_compiler.process(
2050                    expr, include_table=False, literal_binds=True
2051                )
2052                for expr in index.expressions
2053            ),
2054        )
2055
2056        # handle other included columns
2057        if index.dialect_options["mssql"]["include"]:
2058            inclusions = [
2059                index.table.c[col]
2060                if isinstance(col, util.string_types)
2061                else col
2062                for col in index.dialect_options["mssql"]["include"]
2063            ]
2064
2065            text += " INCLUDE (%s)" % ", ".join(
2066                [preparer.quote(c.name) for c in inclusions]
2067            )
2068
2069        whereclause = index.dialect_options["mssql"]["where"]
2070
2071        if whereclause is not None:
2072            where_compiled = self.sql_compiler.process(
2073                whereclause, include_table=False, literal_binds=True
2074            )
2075            text += " WHERE " + where_compiled
2076
2077        return text
2078
2079    def visit_drop_index(self, drop):
2080        return "\nDROP INDEX %s ON %s" % (
2081            self._prepared_index_name(drop.element, include_schema=False),
2082            self.preparer.format_table(drop.element.table),
2083        )
2084
2085    def visit_primary_key_constraint(self, constraint):
2086        if len(constraint) == 0:
2087            return ""
2088        text = ""
2089        if constraint.name is not None:
2090            text += "CONSTRAINT %s " % self.preparer.format_constraint(
2091                constraint
2092            )
2093        text += "PRIMARY KEY "
2094
2095        clustered = constraint.dialect_options["mssql"]["clustered"]
2096        if clustered is not None:
2097            if clustered:
2098                text += "CLUSTERED "
2099            else:
2100                text += "NONCLUSTERED "
2101
2102        text += "(%s)" % ", ".join(
2103            self.preparer.quote(c.name) for c in constraint
2104        )
2105        text += self.define_constraint_deferrability(constraint)
2106        return text
2107
2108    def visit_unique_constraint(self, constraint):
2109        if len(constraint) == 0:
2110            return ""
2111        text = ""
2112        if constraint.name is not None:
2113            formatted_name = self.preparer.format_constraint(constraint)
2114            if formatted_name is not None:
2115                text += "CONSTRAINT %s " % formatted_name
2116        text += "UNIQUE "
2117
2118        clustered = constraint.dialect_options["mssql"]["clustered"]
2119        if clustered is not None:
2120            if clustered:
2121                text += "CLUSTERED "
2122            else:
2123                text += "NONCLUSTERED "
2124
2125        text += "(%s)" % ", ".join(
2126            self.preparer.quote(c.name) for c in constraint
2127        )
2128        text += self.define_constraint_deferrability(constraint)
2129        return text
2130
2131    def visit_computed_column(self, generated):
2132        text = "AS (%s)" % self.sql_compiler.process(
2133            generated.sqltext, include_table=False, literal_binds=True
2134        )
2135        # explicitly check for True|False since None means server default
2136        if generated.persisted is True:
2137            text += " PERSISTED"
2138        return text
2139
2140
2141class MSIdentifierPreparer(compiler.IdentifierPreparer):
2142    reserved_words = RESERVED_WORDS
2143
2144    def __init__(self, dialect):
2145        super(MSIdentifierPreparer, self).__init__(
2146            dialect,
2147            initial_quote="[",
2148            final_quote="]",
2149            quote_case_sensitive_collations=False,
2150        )
2151
2152    def _escape_identifier(self, value):
2153        return value.replace("]", "]]")
2154
2155    def _unescape_identifier(self, value):
2156        return value.replace("]]", "]")
2157
2158    def quote_schema(self, schema, force=None):
2159        """Prepare a quoted table and schema name."""
2160
2161        # need to re-implement the deprecation warning entirely
2162        if force is not None:
2163            # not using the util.deprecated_params() decorator in this
2164            # case because of the additional function call overhead on this
2165            # very performance-critical spot.
2166            util.warn_deprecated(
2167                "The IdentifierPreparer.quote_schema.force parameter is "
2168                "deprecated and will be removed in a future release.  This "
2169                "flag has no effect on the behavior of the "
2170                "IdentifierPreparer.quote method; please refer to "
2171                "quoted_name()."
2172            )
2173
2174        dbname, owner = _schema_elements(schema)
2175        if dbname:
2176            result = "%s.%s" % (self.quote(dbname), self.quote(owner))
2177        elif owner:
2178            result = self.quote(owner)
2179        else:
2180            result = ""
2181        return result
2182
2183
2184def _db_plus_owner_listing(fn):
2185    def wrap(dialect, connection, schema=None, **kw):
2186        dbname, owner = _owner_plus_db(dialect, schema)
2187        return _switch_db(
2188            dbname,
2189            connection,
2190            fn,
2191            dialect,
2192            connection,
2193            dbname,
2194            owner,
2195            schema,
2196            **kw
2197        )
2198
2199    return update_wrapper(wrap, fn)
2200
2201
2202def _db_plus_owner(fn):
2203    def wrap(dialect, connection, tablename, schema=None, **kw):
2204        dbname, owner = _owner_plus_db(dialect, schema)
2205        return _switch_db(
2206            dbname,
2207            connection,
2208            fn,
2209            dialect,
2210            connection,
2211            tablename,
2212            dbname,
2213            owner,
2214            schema,
2215            **kw
2216        )
2217
2218    return update_wrapper(wrap, fn)
2219
2220
2221def _switch_db(dbname, connection, fn, *arg, **kw):
2222    if dbname:
2223        current_db = connection.scalar("select db_name()")
2224        if current_db != dbname:
2225            connection.execute(
2226                "use %s" % connection.dialect.identifier_preparer.quote(dbname)
2227            )
2228    try:
2229        return fn(*arg, **kw)
2230    finally:
2231        if dbname and current_db != dbname:
2232            connection.execute(
2233                "use %s"
2234                % connection.dialect.identifier_preparer.quote(current_db)
2235            )
2236
2237
2238def _owner_plus_db(dialect, schema):
2239    if not schema:
2240        return None, dialect.default_schema_name
2241    elif "." in schema:
2242        return _schema_elements(schema)
2243    else:
2244        return None, schema
2245
2246
2247_memoized_schema = util.LRUCache()
2248
2249
2250def _schema_elements(schema):
2251    if isinstance(schema, quoted_name) and schema.quote:
2252        return None, schema
2253
2254    if schema in _memoized_schema:
2255        return _memoized_schema[schema]
2256
2257    # tests for this function are in:
2258    # test/dialect/mssql/test_reflection.py ->
2259    #           OwnerPlusDBTest.test_owner_database_pairs
2260    # test/dialect/mssql/test_compiler.py -> test_force_schema_*
2261    # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_*
2262    #
2263
2264    push = []
2265    symbol = ""
2266    bracket = False
2267    has_brackets = False
2268    for token in re.split(r"(\[|\]|\.)", schema):
2269        if not token:
2270            continue
2271        if token == "[":
2272            bracket = True
2273            has_brackets = True
2274        elif token == "]":
2275            bracket = False
2276        elif not bracket and token == ".":
2277            if has_brackets:
2278                push.append("[%s]" % symbol)
2279            else:
2280                push.append(symbol)
2281            symbol = ""
2282            has_brackets = False
2283        else:
2284            symbol += token
2285    if symbol:
2286        push.append(symbol)
2287    if len(push) > 1:
2288        dbname, owner = ".".join(push[0:-1]), push[-1]
2289
2290        # test for internal brackets
2291        if re.match(r".*\].*\[.*", dbname[1:-1]):
2292            dbname = quoted_name(dbname, quote=False)
2293        else:
2294            dbname = dbname.lstrip("[").rstrip("]")
2295
2296    elif len(push):
2297        dbname, owner = None, push[0]
2298    else:
2299        dbname, owner = None, None
2300
2301    _memoized_schema[schema] = dbname, owner
2302    return dbname, owner
2303
2304
2305class MSDialect(default.DefaultDialect):
2306    name = "mssql"
2307    supports_default_values = True
2308    supports_empty_insert = False
2309    execution_ctx_cls = MSExecutionContext
2310    use_scope_identity = True
2311    max_identifier_length = 128
2312    schema_name = "dbo"
2313
2314    colspecs = {
2315        sqltypes.DateTime: _MSDateTime,
2316        sqltypes.Date: _MSDate,
2317        sqltypes.Time: TIME,
2318        sqltypes.Unicode: _MSUnicode,
2319        sqltypes.UnicodeText: _MSUnicodeText,
2320    }
2321
2322    engine_config_types = default.DefaultDialect.engine_config_types.union(
2323        [("legacy_schema_aliasing", util.asbool)]
2324    )
2325
2326    ischema_names = ischema_names
2327
2328    supports_native_boolean = False
2329    non_native_boolean_check_constraint = False
2330    supports_unicode_binds = True
2331    postfetch_lastrowid = True
2332    _supports_nvarchar_max = False
2333
2334    server_version_info = ()
2335
2336    statement_compiler = MSSQLCompiler
2337    ddl_compiler = MSDDLCompiler
2338    type_compiler = MSTypeCompiler
2339    preparer = MSIdentifierPreparer
2340
2341    construct_arguments = [
2342        (sa_schema.PrimaryKeyConstraint, {"clustered": None}),
2343        (sa_schema.UniqueConstraint, {"clustered": None}),
2344        (sa_schema.Index, {"clustered": None, "include": None, "where": None}),
2345        (sa_schema.Column, {"identity_start": 1, "identity_increment": 1}),
2346    ]
2347
2348    def __init__(
2349        self,
2350        query_timeout=None,
2351        use_scope_identity=True,
2352        schema_name="dbo",
2353        isolation_level=None,
2354        deprecate_large_types=None,
2355        legacy_schema_aliasing=False,
2356        **opts
2357    ):
2358        self.query_timeout = int(query_timeout or 0)
2359        self.schema_name = schema_name
2360
2361        self.use_scope_identity = use_scope_identity
2362        self.deprecate_large_types = deprecate_large_types
2363        self.legacy_schema_aliasing = legacy_schema_aliasing
2364
2365        super(MSDialect, self).__init__(**opts)
2366
2367        self.isolation_level = isolation_level
2368
2369    def do_savepoint(self, connection, name):
2370        # give the DBAPI a push
2371        connection.execute("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
2372        super(MSDialect, self).do_savepoint(connection, name)
2373
2374    def do_release_savepoint(self, connection, name):
2375        # SQL Server does not support RELEASE SAVEPOINT
2376        pass
2377
2378    _isolation_lookup = set(
2379        [
2380            "SERIALIZABLE",
2381            "READ UNCOMMITTED",
2382            "READ COMMITTED",
2383            "REPEATABLE READ",
2384            "SNAPSHOT",
2385        ]
2386    )
2387
2388    def set_isolation_level(self, connection, level):
2389        level = level.replace("_", " ")
2390        if level not in self._isolation_lookup:
2391            raise exc.ArgumentError(
2392                "Invalid value '%s' for isolation_level. "
2393                "Valid isolation levels for %s are %s"
2394                % (level, self.name, ", ".join(self._isolation_lookup))
2395            )
2396        cursor = connection.cursor()
2397        cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level)
2398        cursor.close()
2399        if level == "SNAPSHOT":
2400            connection.commit()
2401
2402    def get_isolation_level(self, connection):
2403        if self.server_version_info < MS_2005_VERSION:
2404            raise NotImplementedError(
2405                "Can't fetch isolation level prior to SQL Server 2005"
2406            )
2407
2408        last_error = None
2409
2410        views = ("sys.dm_exec_sessions", "sys.dm_pdw_nodes_exec_sessions")
2411        for view in views:
2412            cursor = connection.cursor()
2413            try:
2414                cursor.execute(
2415                    """
2416                  SELECT CASE transaction_isolation_level
2417                    WHEN 0 THEN NULL
2418                    WHEN 1 THEN 'READ UNCOMMITTED'
2419                    WHEN 2 THEN 'READ COMMITTED'
2420                    WHEN 3 THEN 'REPEATABLE READ'
2421                    WHEN 4 THEN 'SERIALIZABLE'
2422                    WHEN 5 THEN 'SNAPSHOT' END AS TRANSACTION_ISOLATION_LEVEL
2423                    FROM %s
2424                    where session_id = @@SPID
2425                  """
2426                    % view
2427                )
2428                val = cursor.fetchone()[0]
2429            except self.dbapi.Error as err:
2430                # Python3 scoping rules
2431                last_error = err
2432                continue
2433            else:
2434                return val.upper()
2435            finally:
2436                cursor.close()
2437        else:
2438            # note that the NotImplementedError is caught by
2439            # DefaultDialect, so the warning here is all that displays
2440            util.warn(
2441                "Could not fetch transaction isolation level, "
2442                "tried views: %s; final error was: %s" % (views, last_error)
2443            )
2444            raise NotImplementedError(
2445                "Can't fetch isolation level on this particular "
2446                "SQL Server version. tried views: %s; final error was: %s"
2447                % (views, last_error)
2448            )
2449
2450    def initialize(self, connection):
2451        super(MSDialect, self).initialize(connection)
2452        self._setup_version_attributes()
2453        self._setup_supports_nvarchar_max(connection)
2454
2455    def on_connect(self):
2456        if self.isolation_level is not None:
2457
2458            def connect(conn):
2459                self.set_isolation_level(conn, self.isolation_level)
2460
2461            return connect
2462        else:
2463            return None
2464
2465    def _setup_version_attributes(self):
2466        if self.server_version_info[0] not in list(range(8, 17)):
2467            util.warn(
2468                "Unrecognized server version info '%s'.  Some SQL Server "
2469                "features may not function properly."
2470                % ".".join(str(x) for x in self.server_version_info)
2471            )
2472        if (
2473            self.server_version_info >= MS_2005_VERSION
2474            and "implicit_returning" not in self.__dict__
2475        ):
2476            self.implicit_returning = True
2477        if self.server_version_info >= MS_2008_VERSION:
2478            self.supports_multivalues_insert = True
2479        if self.deprecate_large_types is None:
2480            self.deprecate_large_types = (
2481                self.server_version_info >= MS_2012_VERSION
2482            )
2483
2484    def _setup_supports_nvarchar_max(self, connection):
2485        try:
2486            connection.scalar(
2487                sql.text("SELECT CAST('test max support' AS NVARCHAR(max))")
2488            )
2489        except exc.DBAPIError:
2490            self._supports_nvarchar_max = False
2491        else:
2492            self._supports_nvarchar_max = True
2493
2494    def _get_default_schema_name(self, connection):
2495        if self.server_version_info < MS_2005_VERSION:
2496            return self.schema_name
2497        else:
2498            query = sql.text("SELECT schema_name()")
2499            default_schema_name = connection.scalar(query)
2500            if default_schema_name is not None:
2501                # guard against the case where the default_schema_name is being
2502                # fed back into a table reflection function.
2503                return quoted_name(default_schema_name, quote=True)
2504            else:
2505                return self.schema_name
2506
2507    @_db_plus_owner
2508    def has_table(self, connection, tablename, dbname, owner, schema):
2509        columns = ischema.columns
2510
2511        whereclause = columns.c.table_name == tablename
2512
2513        if owner:
2514            whereclause = sql.and_(
2515                whereclause, columns.c.table_schema == owner
2516            )
2517        s = sql.select([columns], whereclause)
2518        c = connection.execute(s)
2519        return c.first() is not None
2520
2521    @reflection.cache
2522    def get_schema_names(self, connection, **kw):
2523        s = sql.select(
2524            [ischema.schemata.c.schema_name],
2525            order_by=[ischema.schemata.c.schema_name],
2526        )
2527        schema_names = [r[0] for r in connection.execute(s)]
2528        return schema_names
2529
2530    @reflection.cache
2531    @_db_plus_owner_listing
2532    def get_table_names(self, connection, dbname, owner, schema, **kw):
2533        tables = ischema.tables
2534        s = sql.select(
2535            [tables.c.table_name],
2536            sql.and_(
2537                tables.c.table_schema == owner,
2538                tables.c.table_type == "BASE TABLE",
2539            ),
2540            order_by=[tables.c.table_name],
2541        )
2542        table_names = [r[0] for r in connection.execute(s)]
2543        return table_names
2544
2545    @reflection.cache
2546    @_db_plus_owner_listing
2547    def get_view_names(self, connection, dbname, owner, schema, **kw):
2548        tables = ischema.tables
2549        s = sql.select(
2550            [tables.c.table_name],
2551            sql.and_(
2552                tables.c.table_schema == owner, tables.c.table_type == "VIEW"
2553            ),
2554            order_by=[tables.c.table_name],
2555        )
2556        view_names = [r[0] for r in connection.execute(s)]
2557        return view_names
2558
2559    @reflection.cache
2560    @_db_plus_owner
2561    def get_indexes(self, connection, tablename, dbname, owner, schema, **kw):
2562        # using system catalogs, don't support index reflection
2563        # below MS 2005
2564        if self.server_version_info < MS_2005_VERSION:
2565            return []
2566
2567        rp = connection.execute(
2568            sql.text(
2569                "select ind.index_id, ind.is_unique, ind.name "
2570                "from sys.indexes as ind join sys.tables as tab on "
2571                "ind.object_id=tab.object_id "
2572                "join sys.schemas as sch on sch.schema_id=tab.schema_id "
2573                "where tab.name = :tabname "
2574                "and sch.name=:schname "
2575                "and ind.is_primary_key=0 and ind.type != 0"
2576            )
2577            .bindparams(
2578                sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
2579                sql.bindparam("schname", owner, ischema.CoerceUnicode()),
2580            )
2581            .columns(name=sqltypes.Unicode())
2582        )
2583        indexes = {}
2584        for row in rp:
2585            indexes[row["index_id"]] = {
2586                "name": row["name"],
2587                "unique": row["is_unique"] == 1,
2588                "column_names": [],
2589            }
2590        rp = connection.execute(
2591            sql.text(
2592                "select ind_col.index_id, ind_col.object_id, col.name "
2593                "from sys.columns as col "
2594                "join sys.tables as tab on tab.object_id=col.object_id "
2595                "join sys.index_columns as ind_col on "
2596                "(ind_col.column_id=col.column_id and "
2597                "ind_col.object_id=tab.object_id) "
2598                "join sys.schemas as sch on sch.schema_id=tab.schema_id "
2599                "where tab.name=:tabname "
2600                "and sch.name=:schname"
2601            )
2602            .bindparams(
2603                sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
2604                sql.bindparam("schname", owner, ischema.CoerceUnicode()),
2605            )
2606            .columns(name=sqltypes.Unicode())
2607        )
2608        for row in rp:
2609            if row["index_id"] in indexes:
2610                indexes[row["index_id"]]["column_names"].append(row["name"])
2611
2612        return list(indexes.values())
2613
2614    @reflection.cache
2615    @_db_plus_owner
2616    def get_view_definition(
2617        self, connection, viewname, dbname, owner, schema, **kw
2618    ):
2619        rp = connection.execute(
2620            sql.text(
2621                "select definition from sys.sql_modules as mod, "
2622                "sys.views as views, "
2623                "sys.schemas as sch"
2624                " where "
2625                "mod.object_id=views.object_id and "
2626                "views.schema_id=sch.schema_id and "
2627                "views.name=:viewname and sch.name=:schname"
2628            ).bindparams(
2629                sql.bindparam("viewname", viewname, ischema.CoerceUnicode()),
2630                sql.bindparam("schname", owner, ischema.CoerceUnicode()),
2631            )
2632        )
2633
2634        if rp:
2635            view_def = rp.scalar()
2636            return view_def
2637
2638    @reflection.cache
2639    @_db_plus_owner
2640    def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
2641        # Get base columns
2642        columns = ischema.columns
2643        computed_cols = ischema.computed_columns
2644        if owner:
2645            whereclause = sql.and_(
2646                columns.c.table_name == tablename,
2647                columns.c.table_schema == owner,
2648            )
2649            table_fullname = "%s.%s" % (owner, tablename)
2650            full_name = columns.c.table_schema + "." + columns.c.table_name
2651            join_on = computed_cols.c.object_id == func.object_id(full_name)
2652        else:
2653            whereclause = columns.c.table_name == tablename
2654            table_fullname = tablename
2655            join_on = computed_cols.c.object_id == func.object_id(
2656                columns.c.table_name
2657            )
2658
2659        join_on = sql.and_(
2660            join_on, columns.c.column_name == computed_cols.c.name
2661        )
2662        join = columns.join(computed_cols, onclause=join_on, isouter=True)
2663
2664        if self._supports_nvarchar_max:
2665            computed_definition = computed_cols.c.definition
2666        else:
2667            # tds_version 4.2 does not support NVARCHAR(MAX)
2668            computed_definition = sql.cast(
2669                computed_cols.c.definition, NVARCHAR(4000)
2670            )
2671
2672        s = sql.select(
2673            [columns, computed_definition, computed_cols.c.is_persisted],
2674            whereclause,
2675            from_obj=join,
2676            order_by=[columns.c.ordinal_position],
2677        )
2678
2679        c = connection.execute(s)
2680        cols = []
2681
2682        while True:
2683            row = c.fetchone()
2684            if row is None:
2685                break
2686            name = row[columns.c.column_name]
2687            type_ = row[columns.c.data_type]
2688            nullable = row[columns.c.is_nullable] == "YES"
2689            charlen = row[columns.c.character_maximum_length]
2690            numericprec = row[columns.c.numeric_precision]
2691            numericscale = row[columns.c.numeric_scale]
2692            default = row[columns.c.column_default]
2693            collation = row[columns.c.collation_name]
2694            definition = row[computed_definition]
2695            is_persisted = row[computed_cols.c.is_persisted]
2696
2697            coltype = self.ischema_names.get(type_, None)
2698
2699            kwargs = {}
2700            if coltype in (
2701                MSString,
2702                MSChar,
2703                MSNVarchar,
2704                MSNChar,
2705                MSText,
2706                MSNText,
2707                MSBinary,
2708                MSVarBinary,
2709                sqltypes.LargeBinary,
2710            ):
2711                if charlen == -1:
2712                    charlen = None
2713                kwargs["length"] = charlen
2714                if collation:
2715                    kwargs["collation"] = collation
2716
2717            if coltype is None:
2718                util.warn(
2719                    "Did not recognize type '%s' of column '%s'"
2720                    % (type_, name)
2721                )
2722                coltype = sqltypes.NULLTYPE
2723            else:
2724                if issubclass(coltype, sqltypes.Numeric):
2725                    kwargs["precision"] = numericprec
2726
2727                    if not issubclass(coltype, sqltypes.Float):
2728                        kwargs["scale"] = numericscale
2729
2730                coltype = coltype(**kwargs)
2731            cdict = {
2732                "name": name,
2733                "type": coltype,
2734                "nullable": nullable,
2735                "default": default,
2736                "autoincrement": False,
2737            }
2738
2739            if definition is not None and is_persisted is not None:
2740                cdict["computed"] = {
2741                    "sqltext": definition,
2742                    "persisted": is_persisted,
2743                }
2744
2745            cols.append(cdict)
2746        # autoincrement and identity
2747        colmap = {}
2748        for col in cols:
2749            colmap[col["name"]] = col
2750        # We also run an sp_columns to check for identity columns:
2751        cursor = connection.execute(
2752            sql.text(
2753                "EXEC sp_columns @table_name = :table_name, "
2754                "@table_owner = :table_owner",
2755            ),
2756            {"table_name": tablename, "table_owner": owner},
2757        )
2758        ic = None
2759        while True:
2760            row = cursor.fetchone()
2761            if row is None:
2762                break
2763            (col_name, type_name) = row[3], row[5]
2764            if type_name.endswith("identity") and col_name in colmap:
2765                ic = col_name
2766                colmap[col_name]["autoincrement"] = True
2767                colmap[col_name]["dialect_options"] = {
2768                    "mssql_identity_start": 1,
2769                    "mssql_identity_increment": 1,
2770                }
2771                break
2772        cursor.close()
2773
2774        if ic is not None and self.server_version_info >= MS_2005_VERSION:
2775            table_fullname = "%s.%s" % (owner, tablename)
2776            cursor = connection.execute(
2777                "select ident_seed('%s'), ident_incr('%s')"
2778                % (table_fullname, table_fullname)
2779            )
2780
2781            row = cursor.first()
2782            if row is not None and row[0] is not None:
2783                colmap[ic]["dialect_options"].update(
2784                    {
2785                        "mssql_identity_start": int(row[0]),
2786                        "mssql_identity_increment": int(row[1]),
2787                    }
2788                )
2789        return cols
2790
2791    @reflection.cache
2792    @_db_plus_owner
2793    def get_pk_constraint(
2794        self, connection, tablename, dbname, owner, schema, **kw
2795    ):
2796        pkeys = []
2797        TC = ischema.constraints
2798        C = ischema.key_constraints.alias("C")
2799
2800        # Primary key constraints
2801        s = sql.select(
2802            [C.c.column_name, TC.c.constraint_type, C.c.constraint_name],
2803            sql.and_(
2804                TC.c.constraint_name == C.c.constraint_name,
2805                TC.c.table_schema == C.c.table_schema,
2806                C.c.table_name == tablename,
2807                C.c.table_schema == owner,
2808            ),
2809        ).order_by(TC.c.constraint_name, C.c.ordinal_position)
2810        c = connection.execute(s)
2811        constraint_name = None
2812        for row in c:
2813            if "PRIMARY" in row[TC.c.constraint_type.name]:
2814                pkeys.append(row[0])
2815                if constraint_name is None:
2816                    constraint_name = row[C.c.constraint_name.name]
2817        return {"constrained_columns": pkeys, "name": constraint_name}
2818
2819    @reflection.cache
2820    @_db_plus_owner
2821    def get_foreign_keys(
2822        self, connection, tablename, dbname, owner, schema, **kw
2823    ):
2824        RR = ischema.ref_constraints
2825        C = ischema.key_constraints.alias("C")
2826        R = ischema.key_constraints.alias("R")
2827
2828        # Foreign key constraints
2829        s = sql.select(
2830            [
2831                C.c.column_name,
2832                R.c.table_schema,
2833                R.c.table_name,
2834                R.c.column_name,
2835                RR.c.constraint_name,
2836                RR.c.match_option,
2837                RR.c.update_rule,
2838                RR.c.delete_rule,
2839            ],
2840            sql.and_(
2841                C.c.table_name == tablename,
2842                C.c.table_schema == owner,
2843                RR.c.constraint_schema == C.c.table_schema,
2844                C.c.constraint_name == RR.c.constraint_name,
2845                R.c.constraint_name == RR.c.unique_constraint_name,
2846                R.c.constraint_schema == RR.c.unique_constraint_schema,
2847                C.c.ordinal_position == R.c.ordinal_position,
2848            ),
2849            order_by=[RR.c.constraint_name, R.c.ordinal_position],
2850        )
2851
2852        # group rows by constraint ID, to handle multi-column FKs
2853        fkeys = []
2854
2855        def fkey_rec():
2856            return {
2857                "name": None,
2858                "constrained_columns": [],
2859                "referred_schema": None,
2860                "referred_table": None,
2861                "referred_columns": [],
2862            }
2863
2864        fkeys = util.defaultdict(fkey_rec)
2865
2866        for r in connection.execute(s).fetchall():
2867            scol, rschema, rtbl, rcol, rfknm, fkmatch, fkuprule, fkdelrule = r
2868
2869            rec = fkeys[rfknm]
2870            rec["name"] = rfknm
2871            if not rec["referred_table"]:
2872                rec["referred_table"] = rtbl
2873                if schema is not None or owner != rschema:
2874                    if dbname:
2875                        rschema = dbname + "." + rschema
2876                    rec["referred_schema"] = rschema
2877
2878            local_cols, remote_cols = (
2879                rec["constrained_columns"],
2880                rec["referred_columns"],
2881            )
2882
2883            local_cols.append(scol)
2884            remote_cols.append(rcol)
2885
2886        return list(fkeys.values())
2887