1# copied from aiopg 2# https://github.com/aio-libs/aiopg/blob/master/aiopg/pool.py 3 4import asyncio 5import collections 6import warnings 7 8from .connection import connect 9from .utils import (_PoolContextManager, _PoolConnectionContextManager, 10 _PoolAcquireContextManager) 11 12 13def create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1, 14 loop=None, **kwargs): 15 coro = _create_pool(minsize=minsize, maxsize=maxsize, echo=echo, 16 pool_recycle=pool_recycle, loop=loop, **kwargs) 17 return _PoolContextManager(coro) 18 19 20async def _create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1, 21 loop=None, **kwargs): 22 if loop is None: 23 loop = asyncio.get_event_loop() 24 25 pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo, 26 pool_recycle=pool_recycle, loop=loop, **kwargs) 27 if minsize > 0: 28 async with pool._cond: 29 await pool._fill_free_pool(False) 30 return pool 31 32 33class Pool(asyncio.AbstractServer): 34 """Connection pool""" 35 36 def __init__(self, minsize, maxsize, echo, pool_recycle, loop, **kwargs): 37 if minsize < 0: 38 raise ValueError("minsize should be zero or greater") 39 if maxsize < minsize: 40 raise ValueError("maxsize should be not less than minsize") 41 self._minsize = minsize 42 self._loop = loop 43 self._conn_kwargs = kwargs 44 self._acquiring = 0 45 self._free = collections.deque(maxlen=maxsize) 46 self._cond = asyncio.Condition() 47 self._used = set() 48 self._terminated = set() 49 self._closing = False 50 self._closed = False 51 self._echo = echo 52 self._recycle = pool_recycle 53 54 @property 55 def echo(self): 56 return self._echo 57 58 @property 59 def minsize(self): 60 return self._minsize 61 62 @property 63 def maxsize(self): 64 return self._free.maxlen 65 66 @property 67 def size(self): 68 return self.freesize + len(self._used) + self._acquiring 69 70 @property 71 def freesize(self): 72 return len(self._free) 73 74 async def clear(self): 75 """Close all free connections in pool.""" 76 async with self._cond: 77 while self._free: 78 conn = self._free.popleft() 79 await conn.ensure_closed() 80 self._cond.notify() 81 82 def close(self): 83 """Close pool. 84 85 Mark all pool connections to be closed on getting back to pool. 86 Closed pool doesn't allow to acquire new connections. 87 """ 88 if self._closed: 89 return 90 self._closing = True 91 92 def terminate(self): 93 """Terminate pool. 94 95 Close pool with instantly closing all acquired connections also. 96 """ 97 98 self.close() 99 100 for conn in list(self._used): 101 conn.close() 102 self._terminated.add(conn) 103 104 self._used.clear() 105 106 async def wait_closed(self): 107 """Wait for closing all pool's connections.""" 108 109 if self._closed: 110 return 111 if not self._closing: 112 raise RuntimeError(".wait_closed() should be called " 113 "after .close()") 114 115 while self._free: 116 conn = self._free.popleft() 117 conn.close() 118 119 async with self._cond: 120 while self.size > self.freesize: 121 await self._cond.wait() 122 123 self._closed = True 124 125 def acquire(self): 126 """Acquire free connection from the pool.""" 127 coro = self._acquire() 128 return _PoolAcquireContextManager(coro, self) 129 130 async def _acquire(self): 131 if self._closing: 132 raise RuntimeError("Cannot acquire connection after closing pool") 133 async with self._cond: 134 while True: 135 await self._fill_free_pool(True) 136 if self._free: 137 conn = self._free.popleft() 138 assert not conn.closed, conn 139 assert conn not in self._used, (conn, self._used) 140 self._used.add(conn) 141 return conn 142 else: 143 await self._cond.wait() 144 145 async def _fill_free_pool(self, override_min): 146 # iterate over free connections and remove timeouted ones 147 free_size = len(self._free) 148 n = 0 149 while n < free_size: 150 conn = self._free[-1] 151 if conn._reader.at_eof() or conn._reader.exception(): 152 self._free.pop() 153 conn.close() 154 155 elif (self._recycle > -1 and 156 self._loop.time() - conn.last_usage > self._recycle): 157 self._free.pop() 158 conn.close() 159 160 else: 161 self._free.rotate() 162 n += 1 163 164 while self.size < self.minsize: 165 self._acquiring += 1 166 try: 167 conn = await connect(echo=self._echo, loop=self._loop, 168 **self._conn_kwargs) 169 # raise exception if pool is closing 170 self._free.append(conn) 171 self._cond.notify() 172 finally: 173 self._acquiring -= 1 174 if self._free: 175 return 176 177 if override_min and self.size < self.maxsize: 178 self._acquiring += 1 179 try: 180 conn = await connect(echo=self._echo, loop=self._loop, 181 **self._conn_kwargs) 182 # raise exception if pool is closing 183 self._free.append(conn) 184 self._cond.notify() 185 finally: 186 self._acquiring -= 1 187 188 async def _wakeup(self): 189 async with self._cond: 190 self._cond.notify() 191 192 def release(self, conn): 193 """Release free connection back to the connection pool. 194 195 This is **NOT** a coroutine. 196 """ 197 fut = self._loop.create_future() 198 fut.set_result(None) 199 200 if conn in self._terminated: 201 assert conn.closed, conn 202 self._terminated.remove(conn) 203 return fut 204 assert conn in self._used, (conn, self._used) 205 self._used.remove(conn) 206 if not conn.closed: 207 in_trans = conn.get_transaction_status() 208 if in_trans: 209 conn.close() 210 return fut 211 if self._closing: 212 conn.close() 213 else: 214 self._free.append(conn) 215 fut = self._loop.create_task(self._wakeup()) 216 return fut 217 218 def get(self): 219 warnings.warn("pool.get deprecated use pool.acquire instead", 220 DeprecationWarning, 221 stacklevel=2) 222 return _PoolConnectionContextManager(self, None) 223 224 def __enter__(self): 225 raise RuntimeError( 226 '"yield from" should be used as context manager expression') 227 228 def __exit__(self, *args): 229 # This must exist because __enter__ exists, even though that 230 # always raises; that's how the with-statement works. 231 pass # pragma: nocover 232 233 def __iter__(self): 234 # This is not a coroutine. It is meant to enable the idiom: 235 # 236 # with (yield from pool) as conn: 237 # <block> 238 # 239 # as an alternative to: 240 # 241 # conn = yield from pool.acquire() 242 # try: 243 # <block> 244 # finally: 245 # conn.release() 246 conn = yield from self.acquire() 247 return _PoolConnectionContextManager(self, conn) 248 249 def __await__(self): 250 msg = "with await pool as conn deprecated, use" \ 251 "async with pool.acquire() as conn instead" 252 warnings.warn(msg, DeprecationWarning, stacklevel=2) 253 conn = yield from self.acquire() 254 return _PoolConnectionContextManager(self, conn) 255 256 async def __aenter__(self): 257 return self 258 259 async def __aexit__(self, exc_type, exc_val, exc_tb): 260 self.close() 261 await self.wait_closed() 262