1# mysql/pyodbc.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8r"""
9
10
11.. dialect:: mysql+pyodbc
12    :name: PyODBC
13    :dbapi: pyodbc
14    :connectstring: mysql+pyodbc://<username>:<password>@<dsnname>
15    :url: http://pypi.python.org/pypi/pyodbc/
16
17    .. note:: The PyODBC for MySQL dialect is not well supported, and
18       is subject to unresolved character encoding issues
19       which exist within the current ODBC drivers available.
20       (see http://code.google.com/p/pyodbc/issues/detail?id=25).
21       Other dialects for MySQL are recommended.
22
23Pass through exact pyodbc connection string::
24
25    import urllib
26    connection_string = (
27        'DRIVER=MySQL ODBC 8.0 ANSI Driver;'
28        'SERVER=localhost;'
29        'PORT=3307;'
30        'DATABASE=mydb;'
31        'UID=root;'
32        'PWD=(whatever);'
33        'charset=utf8mb4;'
34    )
35    params = urllib.parse.quote_plus(connection_string)
36    connection_uri = "mysql+pyodbc:///?odbc_connect=%s" % params
37
38"""  # noqa
39
40import re
41
42from .base import MySQLDialect
43from .base import MySQLExecutionContext
44from .types import TIME
45from ... import util
46from ...connectors.pyodbc import PyODBCConnector
47from ...sql.sqltypes import Time
48
49
50class _pyodbcTIME(TIME):
51    def result_processor(self, dialect, coltype):
52        def process(value):
53            # pyodbc returns a datetime.time object; no need to convert
54            return value
55
56        return process
57
58
59class MySQLExecutionContext_pyodbc(MySQLExecutionContext):
60    def get_lastrowid(self):
61        cursor = self.create_cursor()
62        cursor.execute("SELECT LAST_INSERT_ID()")
63        lastrowid = cursor.fetchone()[0]
64        cursor.close()
65        return lastrowid
66
67
68class MySQLDialect_pyodbc(PyODBCConnector, MySQLDialect):
69    colspecs = util.update_copy(MySQLDialect.colspecs, {Time: _pyodbcTIME})
70    supports_unicode_statements = False
71    execution_ctx_cls = MySQLExecutionContext_pyodbc
72
73    pyodbc_driver_name = "MySQL"
74
75    def __init__(self, **kw):
76        # deal with http://code.google.com/p/pyodbc/issues/detail?id=25
77        kw.setdefault("convert_unicode", True)
78        super(MySQLDialect_pyodbc, self).__init__(**kw)
79
80    def _detect_charset(self, connection):
81        """Sniff out the character set in use for connection results."""
82
83        # Prefer 'character_set_results' for the current connection over the
84        # value in the driver.  SET NAMES or individual variable SETs will
85        # change the charset without updating the driver's view of the world.
86        #
87        # If it's decided that issuing that sort of SQL leaves you SOL, then
88        # this can prefer the driver value.
89        rs = connection.execute("SHOW VARIABLES LIKE 'character_set%%'")
90        opts = {row[0]: row[1] for row in self._compat_fetchall(rs)}
91        for key in ("character_set_connection", "character_set"):
92            if opts.get(key, None):
93                return opts[key]
94
95        util.warn(
96            "Could not detect the connection character set.  "
97            "Assuming latin1."
98        )
99        return "latin1"
100
101    def _extract_error_code(self, exception):
102        m = re.compile(r"\((\d+)\)").search(str(exception.args))
103        c = m.group(1)
104        if c:
105            return int(c)
106        else:
107            return None
108
109
110dialect = MySQLDialect_pyodbc
111