1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16"""
17A wrapper around `sqlalchemy.create_engine` that handles all of the
18special cases that Buildbot needs.  Those include:
19
20 - pool_recycle for MySQL
21 - %(basedir) substitution
22 - optimal thread pool size calculation
23
24"""
25
26
27import os
28
29import sqlalchemy as sa
30from sqlalchemy.engine import url
31from sqlalchemy.pool import NullPool
32
33from twisted.python import log
34
35from buildbot.util import sautils
36
37# from http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg15079.html
38
39
40class ReconnectingListener:
41
42    def __init__(self):
43        self.retried = False
44
45
46class Strategy:
47
48    def set_up(self, u, engine):
49        pass
50
51    def should_retry(self, operational_error):
52        try:
53            text = operational_error.args[0]
54            return 'Lost connection' in text or 'database is locked' in text
55        except Exception:
56            return False
57
58
59class SqlLiteStrategy(Strategy):
60
61    def set_up(self, u, engine):
62        """Special setup for sqlite engines"""
63        def connect_listener_enable_fk(connection, record):
64            # fk must be enabled for all connections
65            if not getattr(engine, "fk_disabled", False):
66                return  # http://trac.buildbot.net/ticket/3490#ticket
67                # connection.execute('pragma foreign_keys=ON')
68
69        sa.event.listen(engine.pool, 'connect', connect_listener_enable_fk)
70        # try to enable WAL logging
71        if u.database:
72            def connect_listener(connection, record):
73                connection.execute("pragma checkpoint_fullfsync = off")
74
75            sa.event.listen(engine.pool, 'connect', connect_listener)
76
77            log.msg("setting database journal mode to 'wal'")
78            try:
79                engine.execute("pragma journal_mode = wal")
80            except Exception:
81                log.msg("failed to set journal mode - database may fail")
82
83
84class MySQLStrategy(Strategy):
85    disconnect_error_codes = (2006, 2013, 2014, 2045, 2055)
86    deadlock_error_codes = (1213,)
87
88    def in_error_codes(self, args, error_codes):
89        if args:
90            return args[0] in error_codes
91        return False
92
93    def is_disconnect(self, args):
94        return self.in_error_codes(args, self.disconnect_error_codes)
95
96    def is_deadlock(self, args):
97        return self.in_error_codes(args, self.deadlock_error_codes)
98
99    def set_up(self, u, engine):
100        """Special setup for mysql engines"""
101        # add the reconnecting PoolListener that will detect a
102        # disconnected connection and automatically start a new
103        # one.  This provides a measure of additional safety over
104        # the pool_recycle parameter, and is useful when e.g., the
105        # mysql server goes away
106        def checkout_listener(dbapi_con, con_record, con_proxy):
107            try:
108                cursor = dbapi_con.cursor()
109                cursor.execute("SELECT 1")
110            except dbapi_con.OperationalError as ex:
111                if self.is_disconnect(ex.args):
112                    # sqlalchemy will re-create the connection
113                    log.msg('connection will be removed')
114                    raise sa.exc.DisconnectionError()
115                log.msg('exception happened {}'.format(ex))
116                raise
117
118        # older versions of sqlalchemy require the listener to be specified
119        # in the kwargs, in a class instance
120        if sautils.sa_version() < (0, 7, 0):
121            class ReconnectingListener:
122                pass
123            rcl = ReconnectingListener()
124            rcl.checkout = checkout_listener
125            engine.pool.add_listener(rcl)
126        else:
127            sa.event.listen(engine.pool, 'checkout', checkout_listener)
128
129    def should_retry(self, ex):
130        return any([self.is_disconnect(ex.orig.args),
131                    self.is_deadlock(ex.orig.args),
132                    super().should_retry(ex)])
133
134
135def sa_url_set_attr(u, attr, value):
136    if hasattr(u, 'set'):
137        return u.set(**{attr: value})
138    setattr(u, attr, value)
139    return u
140
141
142def special_case_sqlite(u, kwargs):
143    """For sqlite, percent-substitute %(basedir)s and use a full
144    path to the basedir.  If using a memory database, force the
145    pool size to be 1."""
146    max_conns = 1
147
148    # when given a database path, stick the basedir in there
149    if u.database:
150
151        # Use NullPool instead of the sqlalchemy-0.6.8-default
152        # SingletonThreadPool for sqlite to suppress the error in
153        # http://groups.google.com/group/sqlalchemy/msg/f8482e4721a89589,
154        # which also explains that NullPool is the new default in
155        # sqlalchemy 0.7 for non-memory SQLite databases.
156        kwargs.setdefault('poolclass', NullPool)
157
158        database = u.database
159        database = database % dict(basedir=kwargs['basedir'])
160        if not os.path.isabs(database[0]):
161            database = os.path.join(kwargs['basedir'], database)
162
163        u = sa_url_set_attr(u, 'database', database)
164
165    else:
166        # For in-memory database SQLAlchemy will use SingletonThreadPool
167        # and we will run connection creation and all queries in the single
168        # thread.
169        # However connection destruction will be run from the main
170        # thread, which is safe in our case, but not safe in general,
171        # so SQLite will emit warning about it.
172        # Silence that warning.
173        kwargs.setdefault('connect_args', {})['check_same_thread'] = False
174
175    # ignore serializing access to the db
176    if 'serialize_access' in u.query:
177        query = dict(u.query)
178        query.pop('serialize_access')
179        u = sa_url_set_attr(u, 'query', query)
180
181    return u, kwargs, max_conns
182
183
184def special_case_mysql(u, kwargs):
185    """For mysql, take max_idle out of the query arguments, and
186    use its value for pool_recycle.  Also, force use_unicode and
187    charset to be True and 'utf8', failing if they were set to
188    anything else."""
189    query = dict(u.query)
190
191    kwargs['pool_recycle'] = int(query.pop('max_idle', 3600))
192
193    # default to the MyISAM storage engine
194    storage_engine = query.pop('storage_engine', 'MyISAM')
195
196    kwargs['connect_args'] = {
197        'init_command': 'SET default_storage_engine={}'.format(storage_engine)
198    }
199
200    if 'use_unicode' in query:
201        if query['use_unicode'] != "True":
202            raise TypeError("Buildbot requires use_unicode=True " +
203                            "(and adds it automatically)")
204    else:
205        query['use_unicode'] = "True"
206
207    if 'charset' in query:
208        if query['charset'] != "utf8":
209            raise TypeError("Buildbot requires charset=utf8 " +
210                            "(and adds it automatically)")
211    else:
212        query['charset'] = 'utf8'
213
214    u = sa_url_set_attr(u, 'query', query)
215
216    return u, kwargs, None
217
218
219def get_drivers_strategy(drivername):
220    if drivername.startswith('sqlite'):
221        return SqlLiteStrategy()
222    elif drivername.startswith('mysql'):
223        return MySQLStrategy()
224    return Strategy()
225
226
227def create_engine(name_or_url, **kwargs):
228    if 'basedir' not in kwargs:
229        raise TypeError('no basedir supplied to create_engine')
230
231    max_conns = None
232
233    # apply special cases
234    u = url.make_url(name_or_url)
235    if u.drivername.startswith('sqlite'):
236        u, kwargs, max_conns = special_case_sqlite(u, kwargs)
237    elif u.drivername.startswith('mysql'):
238        u, kwargs, max_conns = special_case_mysql(u, kwargs)
239
240    # remove the basedir as it may confuse sqlalchemy
241    basedir = kwargs.pop('basedir')
242
243    # calculate the maximum number of connections from the pool parameters,
244    # if it hasn't already been specified
245    if max_conns is None:
246        max_conns = kwargs.get(
247            'pool_size', 5) + kwargs.get('max_overflow', 10)
248    driver_strategy = get_drivers_strategy(u.drivername)
249    engine = sa.create_engine(u, **kwargs)
250    driver_strategy.set_up(u, engine)
251    engine.should_retry = driver_strategy.should_retry
252    # annotate the engine with the optimal thread pool size; this is used
253    # by DBConnector to configure the surrounding thread pool
254    engine.optimal_thread_pool_size = max_conns
255
256    # keep the basedir
257    engine.buildbot_basedir = basedir
258    return engine
259