1# copied from aiopg
2# https://github.com/aio-libs/aiopg/blob/master/aiopg/pool.py
3
4import asyncio
5import collections
6import warnings
7
8from .connection import connect
9from .utils import (_PoolContextManager, _PoolConnectionContextManager,
10                    _PoolAcquireContextManager)
11
12
13def create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1,
14                loop=None, **kwargs):
15    coro = _create_pool(minsize=minsize, maxsize=maxsize, echo=echo,
16                        pool_recycle=pool_recycle, loop=loop, **kwargs)
17    return _PoolContextManager(coro)
18
19
20async def _create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1,
21                       loop=None, **kwargs):
22    if loop is None:
23        loop = asyncio.get_event_loop()
24
25    pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo,
26                pool_recycle=pool_recycle, loop=loop, **kwargs)
27    if minsize > 0:
28        async with pool._cond:
29            await pool._fill_free_pool(False)
30    return pool
31
32
33class Pool(asyncio.AbstractServer):
34    """Connection pool"""
35
36    def __init__(self, minsize, maxsize, echo, pool_recycle, loop, **kwargs):
37        if minsize < 0:
38            raise ValueError("minsize should be zero or greater")
39        if maxsize < minsize:
40            raise ValueError("maxsize should be not less than minsize")
41        self._minsize = minsize
42        self._loop = loop
43        self._conn_kwargs = kwargs
44        self._acquiring = 0
45        self._free = collections.deque(maxlen=maxsize)
46        self._cond = asyncio.Condition()
47        self._used = set()
48        self._terminated = set()
49        self._closing = False
50        self._closed = False
51        self._echo = echo
52        self._recycle = pool_recycle
53
54    @property
55    def echo(self):
56        return self._echo
57
58    @property
59    def minsize(self):
60        return self._minsize
61
62    @property
63    def maxsize(self):
64        return self._free.maxlen
65
66    @property
67    def size(self):
68        return self.freesize + len(self._used) + self._acquiring
69
70    @property
71    def freesize(self):
72        return len(self._free)
73
74    async def clear(self):
75        """Close all free connections in pool."""
76        async with self._cond:
77            while self._free:
78                conn = self._free.popleft()
79                await conn.ensure_closed()
80            self._cond.notify()
81
82    def close(self):
83        """Close pool.
84
85        Mark all pool connections to be closed on getting back to pool.
86        Closed pool doesn't allow to acquire new connections.
87        """
88        if self._closed:
89            return
90        self._closing = True
91
92    def terminate(self):
93        """Terminate pool.
94
95        Close pool with instantly closing all acquired connections also.
96        """
97
98        self.close()
99
100        for conn in list(self._used):
101            conn.close()
102            self._terminated.add(conn)
103
104        self._used.clear()
105
106    async def wait_closed(self):
107        """Wait for closing all pool's connections."""
108
109        if self._closed:
110            return
111        if not self._closing:
112            raise RuntimeError(".wait_closed() should be called "
113                               "after .close()")
114
115        while self._free:
116            conn = self._free.popleft()
117            conn.close()
118
119        async with self._cond:
120            while self.size > self.freesize:
121                await self._cond.wait()
122
123        self._closed = True
124
125    def acquire(self):
126        """Acquire free connection from the pool."""
127        coro = self._acquire()
128        return _PoolAcquireContextManager(coro, self)
129
130    async def _acquire(self):
131        if self._closing:
132            raise RuntimeError("Cannot acquire connection after closing pool")
133        async with self._cond:
134            while True:
135                await self._fill_free_pool(True)
136                if self._free:
137                    conn = self._free.popleft()
138                    assert not conn.closed, conn
139                    assert conn not in self._used, (conn, self._used)
140                    self._used.add(conn)
141                    return conn
142                else:
143                    await self._cond.wait()
144
145    async def _fill_free_pool(self, override_min):
146        # iterate over free connections and remove timeouted ones
147        free_size = len(self._free)
148        n = 0
149        while n < free_size:
150            conn = self._free[-1]
151            if conn._reader.at_eof() or conn._reader.exception():
152                self._free.pop()
153                conn.close()
154
155            elif (self._recycle > -1 and
156                  self._loop.time() - conn.last_usage > self._recycle):
157                self._free.pop()
158                conn.close()
159
160            else:
161                self._free.rotate()
162            n += 1
163
164        while self.size < self.minsize:
165            self._acquiring += 1
166            try:
167                conn = await connect(echo=self._echo, loop=self._loop,
168                                     **self._conn_kwargs)
169                # raise exception if pool is closing
170                self._free.append(conn)
171                self._cond.notify()
172            finally:
173                self._acquiring -= 1
174        if self._free:
175            return
176
177        if override_min and self.size < self.maxsize:
178            self._acquiring += 1
179            try:
180                conn = await connect(echo=self._echo, loop=self._loop,
181                                     **self._conn_kwargs)
182                # raise exception if pool is closing
183                self._free.append(conn)
184                self._cond.notify()
185            finally:
186                self._acquiring -= 1
187
188    async def _wakeup(self):
189        async with self._cond:
190            self._cond.notify()
191
192    def release(self, conn):
193        """Release free connection back to the connection pool.
194
195        This is **NOT** a coroutine.
196        """
197        fut = self._loop.create_future()
198        fut.set_result(None)
199
200        if conn in self._terminated:
201            assert conn.closed, conn
202            self._terminated.remove(conn)
203            return fut
204        assert conn in self._used, (conn, self._used)
205        self._used.remove(conn)
206        if not conn.closed:
207            in_trans = conn.get_transaction_status()
208            if in_trans:
209                conn.close()
210                return fut
211            if self._closing:
212                conn.close()
213            else:
214                self._free.append(conn)
215            fut = self._loop.create_task(self._wakeup())
216        return fut
217
218    def get(self):
219        warnings.warn("pool.get deprecated use pool.acquire instead",
220                      DeprecationWarning,
221                      stacklevel=2)
222        return _PoolConnectionContextManager(self, None)
223
224    def __enter__(self):
225        raise RuntimeError(
226            '"yield from" should be used as context manager expression')
227
228    def __exit__(self, *args):
229        # This must exist because __enter__ exists, even though that
230        # always raises; that's how the with-statement works.
231        pass  # pragma: nocover
232
233    def __iter__(self):
234        # This is not a coroutine.  It is meant to enable the idiom:
235        #
236        #     with (yield from pool) as conn:
237        #         <block>
238        #
239        # as an alternative to:
240        #
241        #     conn = yield from pool.acquire()
242        #     try:
243        #         <block>
244        #     finally:
245        #         conn.release()
246        conn = yield from self.acquire()
247        return _PoolConnectionContextManager(self, conn)
248
249    def __await__(self):
250        msg = "with await pool as conn deprecated, use" \
251              "async with pool.acquire() as conn instead"
252        warnings.warn(msg, DeprecationWarning, stacklevel=2)
253        conn = yield from self.acquire()
254        return _PoolConnectionContextManager(self, conn)
255
256    async def __aenter__(self):
257        return self
258
259    async def __aexit__(self, exc_type, exc_val, exc_tb):
260        self.close()
261        await self.wait_closed()
262