1# $Id: dbpool.py,v 3e2fedadb6ed 2015/11/10 12:52:17 jon $ 2 3import weakref as _weakref 4import Queue as _Queue 5import thread as _thread 6import time as _time 7import atexit as _atexit 8 9 10_log_level = 0 11_log_name = "/tmp/dbpool.log" 12_log_file = None 13_log_lock = _thread.allocate_lock() 14 15apilevel = "2.0" 16threadsafety = 2 17 18_dbmod = None 19_lock = _thread.allocate_lock() 20_refs = {} 21 22_COPY_ATTRS = ("paramstyle", "Warning", "Error", "InterfaceError", 23 "DatabaseError", "DataError", "OperationalError", "IntegrityError", 24 "InternalError", "ProgrammingError", "NotSupportedError") 25 26 27def _log(level, message, *args, **kwargs): 28 global _log_file 29 30 if _log_level >= level: 31 if args or kwargs: 32 argslist = [repr(arg) for arg in args] 33 argslist.extend("%s=%r" % item for item in kwargs.items()) 34 message += "(" + ", ".join(argslist) + ")" 35 _log_lock.acquire() 36 try: 37 if not _log_file: 38 _log_file = open(_log_name, "a", 1) 39 _log_file.write("%s %s\n" % (_time.strftime("%b %d %H:%M:%S"), message)) 40 finally: 41 _log_lock.release() 42 43 44def set_database(dbmod, minconns, timeout=0, postconnect=None): 45 if minconns < 1: 46 raise ValueError("minconns must be greater than or equal to 1") 47 if _dbmod is not None: 48 if _dbmod is dbmod: 49 return 50 raise Exception("dbpool module is already in use") 51 if len(dbmod.apilevel) != 3 or dbmod.apilevel[:2] != "2." or \ 52 not dbmod.apilevel[2].isdigit(): 53 raise ValueError("specified database module is not DB API 2.0 compliant") 54 if dbmod.threadsafety < 1: 55 raise ValueError("specified database module must have threadsafety level" 56 " of at least 1") 57 _log(1, "set_database", dbmod.__name__, minconns, timeout) 58 g = globals() 59 g["_dbmod"] = dbmod 60 g["_available"] = {} 61 g["_minconns"] = minconns 62 g["_timeout"] = timeout 63 g["_postconnect"] = postconnect 64 for v in _COPY_ATTRS: 65 g[v] = getattr(dbmod, v) 66 67 68def connect(*args, **kwargs): 69 if _dbmod is None: 70 raise Exception("No database module has been specified") 71 key = repr(args) + "\0" + repr(kwargs) 72 _log(1, "connect", *args, **kwargs) 73 try: 74 while True: 75 conn = _available[key].get(0) 76 if _timeout == 0 or _time.time() - conn._lastuse < _timeout: 77 _log(2, "connect: returning connection %r from _available" % conn) 78 return conn 79 else: 80 conn._inner._connection = None 81 _log(2, "connect: discarded connection %r from _available due to age" % 82 conn) 83 except (KeyError, _Queue.Empty): 84 conn = _Connection(None, None, *args, **kwargs) 85 _log(2, "connect: created new connection %r" % conn) 86 return conn 87 88 89def _make_available(conn): 90 key = repr(conn._args) + "\0" + repr(conn._kwargs) 91 _log(2, "_make_available", conn) 92 _lock.acquire() 93 try: 94 try: 95 _available[key].put(conn, 0) 96 _log(3, "_make_available: put into existing _available slot") 97 except KeyError: 98 _log(3, "_make_available: created new _available slot") 99 q = _Queue.Queue(_minconns) 100 q.put(conn, 0) 101 _available[key] = q 102 except _Queue.Full: 103 conn._inner._connection = None 104 _log(3, "_make_available: discarded, _available slot full") 105 finally: 106 _lock.release() 107 108 109def _connection_notinuse(ref): 110 # if the Python interpreter is exiting, the globals might already have 111 # been deleted, so check for them explicitly 112 if _refs is None: 113 return 114 inner = _refs[ref] 115 del _refs[ref] 116 inner._cursorref = None 117 if inner._connection is not None: 118 if _make_available is not None and _Connection is not None: 119 _make_available(_Connection(inner)) 120 121 122class _Connection(object): 123 def __init__(self, inner, *args, **kwargs): 124 self._inner = None 125 _log(4, "_Connection", self, inner, *args, **kwargs) 126 if inner is None: 127 self._inner = _InnerConnection(*args, **kwargs) 128 _log(5, "_Connection: new inner=%r" % self._inner) 129 else: 130 self._inner = inner 131 self._inner._outerref = _weakref.ref(self) 132 ref = _weakref.ref(self, _connection_notinuse) 133 _log(5, "_Connection: ref=%r" % ref) 134 _refs[ref] = self._inner 135 136 def __repr__(self): 137 return "<dbpool._Connection(%r) at %x>" % (self._inner, id(self)) 138 139 def cursor(self, *args, **kwargs): 140 # this method would not be necessary (i.e. the __getattr__ would take 141 # care of it) but if someone does dbpool.connect().cursor() all in one 142 # expression, the outer _Connection class was getting garbage-collected 143 # (and hence the actual database connection being put back in the pool) 144 # *in the middle of the expression*, i.e. after connect() was called but 145 # before cursor() was called. So you could end up with 2 cursors on the 146 # same database connection. 147 return self._inner.cursor(*args, **kwargs) 148 149 def __getattr__(self, attr): 150 return getattr(self._inner, attr) 151 152 153class _InnerConnection(object): 154 def __init__(self, connection, *args, **kwargs): 155 self._connection = None 156 _log(4, "_InnerConnection", self, connection, *args, **kwargs) 157 self._args = args 158 self._kwargs = kwargs 159 if connection is None: 160 _log(2, "_InnerConnection: Calling actual connect", *args, **kwargs) 161 self._connection = _dbmod.connect(*args, **kwargs) 162 if _postconnect: 163 _postconnect(self._connection, *args, **kwargs) 164 else: 165 _log(5, "_InnerConnection: Re-using connection %r" % connection) 166 self._connection = connection 167 self._cursorref = None 168 self._outerref = None 169 self._lock = _thread.allocate_lock() 170 self._lastuse = _time.time() 171 172 def __repr__(self): 173 return "<dbpool._InnerConnection(%r) at %x>" % (self._connection, id(self)) 174 175 def close(self): 176 _log(3, "_Connection.close", self) 177 if self._cursorref is not None: 178 c = self._cursorref() 179 if c is not None: 180 _log(4, "_Connection.close: closing cursor %r" % c) 181 c.close() 182 self._cursorref = None 183 self._outerref = None 184 conn = self._connection 185 if conn: 186 self._connection = None 187 if _make_available is not None: 188 _make_available(_Connection(None, conn, *self._args, **self._kwargs)) 189 190 def __getattr__(self, attr): 191 return getattr(self._connection, attr) 192 193 def cursor(self, *args, **kwargs): 194 _log(3, "cursor", self, *args, **kwargs) 195 if _timeout == 0 or _time.time() - self._lastuse < _timeout: 196 self._lock.acquire() 197 try: 198 if self._cursorref is None or self._cursorref() is None: 199 c = _Cursor(self, *args, **kwargs) 200 self._cursorref = _weakref.ref(c) 201 self._lastuse = _time.time() 202 return c 203 finally: 204 self._lock.release() 205 _log(3, "cursor: creating new connection") 206 return connect(*self._args, **self._kwargs).cursor(*args, **kwargs) 207 208 209class _Cursor(object): 210 def __init__(self, connection, *args, **kwargs): 211 self._cursor = None 212 _log(4, "_Cursor", connection, *args, **kwargs) 213 self._connection = connection 214 self._outer = connection._outerref() 215 self._cursor = connection._connection.cursor(*args, **kwargs) 216 217 def __repr__(self): 218 return "<dbpool._Cursor(%r) at %x>" % (self._cursor, id(self)) 219 220 def close(self): 221 _log(4, "_Cursor.close", self) 222 self._connection._cursorref = None 223 self._connection = None 224 self._cursor.close() 225 self._outer = None 226 227 def __getattr__(self, attr): 228 return getattr(self._cursor, attr) 229 230 231def _exiting(): 232 global _make_available 233 _make_available = None 234 235_atexit.register(_exiting) 236