1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16""" 17A wrapper around `sqlalchemy.create_engine` that handles all of the 18special cases that Buildbot needs. Those include: 19 20 - pool_recycle for MySQL 21 - %(basedir) substitution 22 - optimal thread pool size calculation 23 24""" 25 26 27import os 28 29import sqlalchemy as sa 30from sqlalchemy.engine import url 31from sqlalchemy.pool import NullPool 32 33from twisted.python import log 34 35from buildbot.util import sautils 36 37# from http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg15079.html 38 39 40class ReconnectingListener: 41 42 def __init__(self): 43 self.retried = False 44 45 46class Strategy: 47 48 def set_up(self, u, engine): 49 pass 50 51 def should_retry(self, operational_error): 52 try: 53 text = operational_error.args[0] 54 return 'Lost connection' in text or 'database is locked' in text 55 except Exception: 56 return False 57 58 59class SqlLiteStrategy(Strategy): 60 61 def set_up(self, u, engine): 62 """Special setup for sqlite engines""" 63 def connect_listener_enable_fk(connection, record): 64 # fk must be enabled for all connections 65 if not getattr(engine, "fk_disabled", False): 66 return # http://trac.buildbot.net/ticket/3490#ticket 67 # connection.execute('pragma foreign_keys=ON') 68 69 sa.event.listen(engine.pool, 'connect', connect_listener_enable_fk) 70 # try to enable WAL logging 71 if u.database: 72 def connect_listener(connection, record): 73 connection.execute("pragma checkpoint_fullfsync = off") 74 75 sa.event.listen(engine.pool, 'connect', connect_listener) 76 77 log.msg("setting database journal mode to 'wal'") 78 try: 79 engine.execute("pragma journal_mode = wal") 80 except Exception: 81 log.msg("failed to set journal mode - database may fail") 82 83 84class MySQLStrategy(Strategy): 85 disconnect_error_codes = (2006, 2013, 2014, 2045, 2055) 86 deadlock_error_codes = (1213,) 87 88 def in_error_codes(self, args, error_codes): 89 if args: 90 return args[0] in error_codes 91 return False 92 93 def is_disconnect(self, args): 94 return self.in_error_codes(args, self.disconnect_error_codes) 95 96 def is_deadlock(self, args): 97 return self.in_error_codes(args, self.deadlock_error_codes) 98 99 def set_up(self, u, engine): 100 """Special setup for mysql engines""" 101 # add the reconnecting PoolListener that will detect a 102 # disconnected connection and automatically start a new 103 # one. This provides a measure of additional safety over 104 # the pool_recycle parameter, and is useful when e.g., the 105 # mysql server goes away 106 def checkout_listener(dbapi_con, con_record, con_proxy): 107 try: 108 cursor = dbapi_con.cursor() 109 cursor.execute("SELECT 1") 110 except dbapi_con.OperationalError as ex: 111 if self.is_disconnect(ex.args): 112 # sqlalchemy will re-create the connection 113 log.msg('connection will be removed') 114 raise sa.exc.DisconnectionError() 115 log.msg('exception happened {}'.format(ex)) 116 raise 117 118 # older versions of sqlalchemy require the listener to be specified 119 # in the kwargs, in a class instance 120 if sautils.sa_version() < (0, 7, 0): 121 class ReconnectingListener: 122 pass 123 rcl = ReconnectingListener() 124 rcl.checkout = checkout_listener 125 engine.pool.add_listener(rcl) 126 else: 127 sa.event.listen(engine.pool, 'checkout', checkout_listener) 128 129 def should_retry(self, ex): 130 return any([self.is_disconnect(ex.orig.args), 131 self.is_deadlock(ex.orig.args), 132 super().should_retry(ex)]) 133 134 135def sa_url_set_attr(u, attr, value): 136 if hasattr(u, 'set'): 137 return u.set(**{attr: value}) 138 setattr(u, attr, value) 139 return u 140 141 142def special_case_sqlite(u, kwargs): 143 """For sqlite, percent-substitute %(basedir)s and use a full 144 path to the basedir. If using a memory database, force the 145 pool size to be 1.""" 146 max_conns = 1 147 148 # when given a database path, stick the basedir in there 149 if u.database: 150 151 # Use NullPool instead of the sqlalchemy-0.6.8-default 152 # SingletonThreadPool for sqlite to suppress the error in 153 # http://groups.google.com/group/sqlalchemy/msg/f8482e4721a89589, 154 # which also explains that NullPool is the new default in 155 # sqlalchemy 0.7 for non-memory SQLite databases. 156 kwargs.setdefault('poolclass', NullPool) 157 158 database = u.database 159 database = database % dict(basedir=kwargs['basedir']) 160 if not os.path.isabs(database[0]): 161 database = os.path.join(kwargs['basedir'], database) 162 163 u = sa_url_set_attr(u, 'database', database) 164 165 else: 166 # For in-memory database SQLAlchemy will use SingletonThreadPool 167 # and we will run connection creation and all queries in the single 168 # thread. 169 # However connection destruction will be run from the main 170 # thread, which is safe in our case, but not safe in general, 171 # so SQLite will emit warning about it. 172 # Silence that warning. 173 kwargs.setdefault('connect_args', {})['check_same_thread'] = False 174 175 # ignore serializing access to the db 176 if 'serialize_access' in u.query: 177 query = dict(u.query) 178 query.pop('serialize_access') 179 u = sa_url_set_attr(u, 'query', query) 180 181 return u, kwargs, max_conns 182 183 184def special_case_mysql(u, kwargs): 185 """For mysql, take max_idle out of the query arguments, and 186 use its value for pool_recycle. Also, force use_unicode and 187 charset to be True and 'utf8', failing if they were set to 188 anything else.""" 189 query = dict(u.query) 190 191 kwargs['pool_recycle'] = int(query.pop('max_idle', 3600)) 192 193 # default to the MyISAM storage engine 194 storage_engine = query.pop('storage_engine', 'MyISAM') 195 196 kwargs['connect_args'] = { 197 'init_command': 'SET default_storage_engine={}'.format(storage_engine) 198 } 199 200 if 'use_unicode' in query: 201 if query['use_unicode'] != "True": 202 raise TypeError("Buildbot requires use_unicode=True " + 203 "(and adds it automatically)") 204 else: 205 query['use_unicode'] = "True" 206 207 if 'charset' in query: 208 if query['charset'] != "utf8": 209 raise TypeError("Buildbot requires charset=utf8 " + 210 "(and adds it automatically)") 211 else: 212 query['charset'] = 'utf8' 213 214 u = sa_url_set_attr(u, 'query', query) 215 216 return u, kwargs, None 217 218 219def get_drivers_strategy(drivername): 220 if drivername.startswith('sqlite'): 221 return SqlLiteStrategy() 222 elif drivername.startswith('mysql'): 223 return MySQLStrategy() 224 return Strategy() 225 226 227def create_engine(name_or_url, **kwargs): 228 if 'basedir' not in kwargs: 229 raise TypeError('no basedir supplied to create_engine') 230 231 max_conns = None 232 233 # apply special cases 234 u = url.make_url(name_or_url) 235 if u.drivername.startswith('sqlite'): 236 u, kwargs, max_conns = special_case_sqlite(u, kwargs) 237 elif u.drivername.startswith('mysql'): 238 u, kwargs, max_conns = special_case_mysql(u, kwargs) 239 240 # remove the basedir as it may confuse sqlalchemy 241 basedir = kwargs.pop('basedir') 242 243 # calculate the maximum number of connections from the pool parameters, 244 # if it hasn't already been specified 245 if max_conns is None: 246 max_conns = kwargs.get( 247 'pool_size', 5) + kwargs.get('max_overflow', 10) 248 driver_strategy = get_drivers_strategy(u.drivername) 249 engine = sa.create_engine(u, **kwargs) 250 driver_strategy.set_up(u, engine) 251 engine.should_retry = driver_strategy.should_retry 252 # annotate the engine with the optimal thread pool size; this is used 253 # by DBConnector to configure the surrounding thread pool 254 engine.optimal_thread_pool_size = max_conns 255 256 # keep the basedir 257 engine.buildbot_basedir = basedir 258 return engine 259