1# $Id: dbpool.py,v 3e2fedadb6ed 2015/11/10 12:52:17 jon $
2
3import weakref as _weakref
4import Queue as _Queue
5import thread as _thread
6import time as _time
7import atexit as _atexit
8
9
10_log_level = 0
11_log_name = "/tmp/dbpool.log"
12_log_file = None
13_log_lock = _thread.allocate_lock()
14
15apilevel = "2.0"
16threadsafety = 2
17
18_dbmod = None
19_lock = _thread.allocate_lock()
20_refs = {}
21
22_COPY_ATTRS = ("paramstyle", "Warning", "Error", "InterfaceError",
23  "DatabaseError", "DataError", "OperationalError", "IntegrityError",
24  "InternalError", "ProgrammingError", "NotSupportedError")
25
26
27def _log(level, message, *args, **kwargs):
28  global _log_file
29
30  if _log_level >= level:
31    if args or kwargs:
32      argslist = [repr(arg) for arg in args]
33      argslist.extend("%s=%r" % item for item in kwargs.items())
34      message += "(" + ", ".join(argslist) + ")"
35    _log_lock.acquire()
36    try:
37      if not _log_file:
38        _log_file = open(_log_name, "a", 1)
39      _log_file.write("%s %s\n" % (_time.strftime("%b %d %H:%M:%S"), message))
40    finally:
41      _log_lock.release()
42
43
44def set_database(dbmod, minconns, timeout=0, postconnect=None):
45  if minconns < 1:
46    raise ValueError("minconns must be greater than or equal to 1")
47  if _dbmod is not None:
48    if _dbmod is dbmod:
49      return
50    raise Exception("dbpool module is already in use")
51  if len(dbmod.apilevel) != 3 or dbmod.apilevel[:2] != "2." or \
52    not dbmod.apilevel[2].isdigit():
53    raise ValueError("specified database module is not DB API 2.0 compliant")
54  if dbmod.threadsafety < 1:
55    raise ValueError("specified database module must have threadsafety level"
56      " of at least 1")
57  _log(1, "set_database", dbmod.__name__, minconns, timeout)
58  g = globals()
59  g["_dbmod"] = dbmod
60  g["_available"] = {}
61  g["_minconns"] = minconns
62  g["_timeout"] = timeout
63  g["_postconnect"] = postconnect
64  for v in _COPY_ATTRS:
65    g[v] = getattr(dbmod, v)
66
67
68def connect(*args, **kwargs):
69  if _dbmod is None:
70    raise Exception("No database module has been specified")
71  key = repr(args) + "\0" + repr(kwargs)
72  _log(1, "connect", *args, **kwargs)
73  try:
74    while True:
75      conn = _available[key].get(0)
76      if _timeout == 0 or _time.time() - conn._lastuse < _timeout:
77        _log(2, "connect: returning connection %r from _available" % conn)
78        return conn
79      else:
80        conn._inner._connection = None
81        _log(2, "connect: discarded connection %r from _available due to age" %
82          conn)
83  except (KeyError, _Queue.Empty):
84    conn = _Connection(None, None, *args, **kwargs)
85    _log(2, "connect: created new connection %r" % conn)
86    return conn
87
88
89def _make_available(conn):
90  key = repr(conn._args) + "\0" + repr(conn._kwargs)
91  _log(2, "_make_available", conn)
92  _lock.acquire()
93  try:
94    try:
95      _available[key].put(conn, 0)
96      _log(3, "_make_available: put into existing _available slot")
97    except KeyError:
98      _log(3, "_make_available: created new _available slot")
99      q = _Queue.Queue(_minconns)
100      q.put(conn, 0)
101      _available[key] = q
102    except _Queue.Full:
103      conn._inner._connection = None
104      _log(3, "_make_available: discarded, _available slot full")
105  finally:
106    _lock.release()
107
108
109def _connection_notinuse(ref):
110  # if the Python interpreter is exiting, the globals might already have
111  # been deleted, so check for them explicitly
112  if _refs is None:
113    return
114  inner = _refs[ref]
115  del _refs[ref]
116  inner._cursorref = None
117  if inner._connection is not None:
118    if _make_available is not None and _Connection is not None:
119      _make_available(_Connection(inner))
120
121
122class _Connection(object):
123  def __init__(self, inner, *args, **kwargs):
124    self._inner = None
125    _log(4, "_Connection", self, inner, *args, **kwargs)
126    if inner is None:
127      self._inner = _InnerConnection(*args, **kwargs)
128      _log(5, "_Connection: new inner=%r" % self._inner)
129    else:
130      self._inner = inner
131    self._inner._outerref = _weakref.ref(self)
132    ref = _weakref.ref(self, _connection_notinuse)
133    _log(5, "_Connection: ref=%r" % ref)
134    _refs[ref] = self._inner
135
136  def __repr__(self):
137    return "<dbpool._Connection(%r) at %x>" % (self._inner, id(self))
138
139  def cursor(self, *args, **kwargs):
140    # this method would not be necessary (i.e. the __getattr__ would take
141    # care of it) but if someone does dbpool.connect().cursor() all in one
142    # expression, the outer _Connection class was getting garbage-collected
143    # (and hence the actual database connection being put back in the pool)
144    # *in the middle of the expression*, i.e. after connect() was called but
145    # before cursor() was called. So you could end up with 2 cursors on the
146    # same database connection.
147    return self._inner.cursor(*args, **kwargs)
148
149  def __getattr__(self, attr):
150    return getattr(self._inner, attr)
151
152
153class _InnerConnection(object):
154  def __init__(self, connection, *args, **kwargs):
155    self._connection = None
156    _log(4, "_InnerConnection", self, connection, *args, **kwargs)
157    self._args = args
158    self._kwargs = kwargs
159    if connection is None:
160      _log(2, "_InnerConnection: Calling actual connect", *args, **kwargs)
161      self._connection = _dbmod.connect(*args, **kwargs)
162      if _postconnect:
163        _postconnect(self._connection, *args, **kwargs)
164    else:
165      _log(5, "_InnerConnection: Re-using connection %r" % connection)
166      self._connection = connection
167    self._cursorref = None
168    self._outerref = None
169    self._lock = _thread.allocate_lock()
170    self._lastuse = _time.time()
171
172  def __repr__(self):
173    return "<dbpool._InnerConnection(%r) at %x>" % (self._connection, id(self))
174
175  def close(self):
176    _log(3, "_Connection.close", self)
177    if self._cursorref is not None:
178      c = self._cursorref()
179      if c is not None:
180        _log(4, "_Connection.close: closing cursor %r" % c)
181        c.close()
182    self._cursorref = None
183    self._outerref = None
184    conn = self._connection
185    if conn:
186      self._connection = None
187      if _make_available is not None:
188        _make_available(_Connection(None, conn, *self._args, **self._kwargs))
189
190  def __getattr__(self, attr):
191    return getattr(self._connection, attr)
192
193  def cursor(self, *args, **kwargs):
194    _log(3, "cursor", self, *args, **kwargs)
195    if _timeout == 0 or _time.time() - self._lastuse < _timeout:
196      self._lock.acquire()
197      try:
198        if self._cursorref is None or self._cursorref() is None:
199          c = _Cursor(self, *args, **kwargs)
200          self._cursorref = _weakref.ref(c)
201          self._lastuse = _time.time()
202          return c
203      finally:
204        self._lock.release()
205    _log(3, "cursor: creating new connection")
206    return connect(*self._args, **self._kwargs).cursor(*args, **kwargs)
207
208
209class _Cursor(object):
210  def __init__(self, connection, *args, **kwargs):
211    self._cursor = None
212    _log(4, "_Cursor", connection, *args, **kwargs)
213    self._connection = connection
214    self._outer = connection._outerref()
215    self._cursor = connection._connection.cursor(*args, **kwargs)
216
217  def __repr__(self):
218    return "<dbpool._Cursor(%r) at %x>" % (self._cursor, id(self))
219
220  def close(self):
221    _log(4, "_Cursor.close", self)
222    self._connection._cursorref = None
223    self._connection = None
224    self._cursor.close()
225    self._outer = None
226
227  def __getattr__(self, attr):
228    return getattr(self._cursor, attr)
229
230
231def _exiting():
232  global _make_available
233  _make_available = None
234
235_atexit.register(_exiting)
236