1# Copyright 2014 Mirantis Inc
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""Thread-safe connection pool for python-memcached."""
17
18import collections
19import contextlib
20import itertools
21import queue
22import threading
23import time
24
25try:
26    import eventlet
27except ImportError:
28    eventlet = None
29import memcache
30from oslo_log import log
31
32from oslo_cache._i18n import _
33from oslo_cache import exception
34
35
36LOG = log.getLogger(__name__)
37
38
39class _MemcacheClient(memcache.Client):
40    """Thread global memcache client
41
42    As client is inherited from threading.local we have to restore object
43    methods overloaded by threading.local so we can reuse clients in
44    different threads
45    """
46    __delattr__ = object.__delattr__
47    __getattribute__ = object.__getattribute__
48    __setattr__ = object.__setattr__
49
50    # Hack for lp 1812935
51    if eventlet and eventlet.patcher.is_monkey_patched('thread'):
52        # NOTE(bnemec): I'm not entirely sure why this works in a
53        # monkey-patched environment and not with vanilla stdlib, but it does.
54        def __new__(cls, *args, **kwargs):
55            return object.__new__(cls)
56    else:
57        __new__ = object.__new__
58
59    def __del__(self):
60        pass
61
62
63_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection'])
64
65
66class ConnectionPool(queue.Queue):
67    """Base connection pool class
68
69    This class implements the basic connection pool logic as an abstract base
70    class.
71    """
72    def __init__(self, maxsize, unused_timeout, conn_get_timeout=None):
73        """Initialize the connection pool.
74
75        :param maxsize: maximum number of client connections for the pool
76        :type maxsize: int
77        :param unused_timeout: idle time to live for unused clients (in
78                               seconds). If a client connection object has been
79                               in the pool and idle for longer than the
80                               unused_timeout, it will be reaped. This is to
81                               ensure resources are released as utilization
82                               goes down.
83        :type unused_timeout: int
84        :param conn_get_timeout: maximum time in seconds to wait for a
85                                 connection. If set to `None` timeout is
86                                 indefinite.
87        :type conn_get_timeout: int
88        """
89        # super() cannot be used here because Queue in stdlib is an
90        # old-style class
91        queue.Queue.__init__(self, maxsize)
92        self._unused_timeout = unused_timeout
93        self._connection_get_timeout = conn_get_timeout
94        self._acquired = 0
95
96    def _create_connection(self):
97        """Returns a connection instance.
98
99        This is called when the pool needs another instance created.
100
101        :returns: a new connection instance
102
103        """
104        raise NotImplementedError
105
106    def _destroy_connection(self, conn):
107        """Destroy and cleanup a connection instance.
108
109        This is called when the pool wishes to get rid of an existing
110        connection. This is the opportunity for a subclass to free up
111        resources and cleanup after itself.
112
113        :param conn: the connection object to destroy
114
115        """
116        raise NotImplementedError
117
118    def _do_log(self, level, msg, *args, **kwargs):
119        if LOG.isEnabledFor(level):
120            thread_id = threading.current_thread().ident
121            args = (id(self), thread_id) + args
122            prefix = 'Memcached pool %s, thread %s: '
123            LOG.log(level, prefix + msg, *args, **kwargs)
124
125    def _debug_logger(self, msg, *args, **kwargs):
126        self._do_log(log.DEBUG, msg, *args, **kwargs)
127
128    def _trace_logger(self, msg, *args, **kwargs):
129        self._do_log(log.TRACE, msg, *args, **kwargs)
130
131    @contextlib.contextmanager
132    def acquire(self):
133        self._trace_logger('Acquiring connection')
134        self._drop_expired_connections()
135        try:
136            conn = self.get(timeout=self._connection_get_timeout)
137        except queue.Empty:
138            raise exception.QueueEmpty(
139                _('Unable to get a connection from pool id %(id)s after '
140                  '%(seconds)s seconds.') %
141                {'id': id(self), 'seconds': self._connection_get_timeout})
142        self._trace_logger('Acquired connection %s', id(conn))
143        try:
144            yield conn
145        finally:
146            self._trace_logger('Releasing connection %s', id(conn))
147            try:
148                # super() cannot be used here because Queue in stdlib is an
149                # old-style class
150                queue.Queue.put(self, conn, block=False)
151            except queue.Full:
152                self._trace_logger('Reaping exceeding connection %s', id(conn))
153                self._destroy_connection(conn)
154
155    def _qsize(self):
156        if self.maxsize:
157            return self.maxsize - self._acquired
158        else:
159            # A value indicating there is always a free connection
160            # if maxsize is None or 0
161            return 1
162
163    # NOTE(dstanek): stdlib and eventlet Queue implementations
164    # have different names for the qsize method. This ensures
165    # that we override both of them.
166    if not hasattr(queue.Queue, '_qsize'):
167        qsize = _qsize
168
169    def _get(self):
170        try:
171            conn = self.queue.pop().connection
172        except IndexError:
173            conn = self._create_connection()
174        self._acquired += 1
175        return conn
176
177    def _drop_expired_connections(self):
178        """Drop all expired connections from the left end of the queue."""
179        now = time.time()
180        try:
181            while self.queue[0].ttl < now:
182                conn = self.queue.popleft().connection
183                self._trace_logger('Reaping connection %s', id(conn))
184                self._destroy_connection(conn)
185        except IndexError:
186            # NOTE(amakarov): This is an expected excepton. so there's no
187            # need to react. We have to handle exceptions instead of
188            # checking queue length as IndexError is a result of race
189            # condition too as well as of mere queue depletio of mere queue
190            # depletionn.
191            pass
192
193    def _put(self, conn):
194        self.queue.append(_PoolItem(
195            ttl=time.time() + self._unused_timeout,
196            connection=conn,
197        ))
198        self._acquired -= 1
199
200
201class MemcacheClientPool(ConnectionPool):
202    def __init__(self, urls, arguments, **kwargs):
203        # super() cannot be used here because Queue in stdlib is an
204        # old-style class
205        ConnectionPool.__init__(self, **kwargs)
206        self.urls = urls
207        self._arguments = arguments
208        # NOTE(morganfainberg): The host objects expect an int for the
209        # deaduntil value. Initialize this at 0 for each host with 0 indicating
210        # the host is not dead.
211        self._hosts_deaduntil = [0] * len(urls)
212
213    def _create_connection(self):
214        # NOTE(morgan): Explicitly set flush_on_reconnect for pooled
215        # connections. This should ensure that stale data is never consumed
216        # from a server that pops in/out due to a network partition
217        # or disconnect.
218        #
219        # See the help from python-memcached:
220        #
221        # param flush_on_reconnect: optional flag which prevents a
222        #        scenario that can cause stale data to be read: If there's more
223        #        than one memcached server and the connection to one is
224        #        interrupted, keys that mapped to that server will get
225        #        reassigned to another. If the first server comes back, those
226        #        keys will map to it again. If it still has its data, get()s
227        #        can read stale data that was overwritten on another
228        #        server. This flag is off by default for backwards
229        #        compatibility.
230        #
231        # The normal non-pooled clients connect explicitly on each use and
232        # does not need the explicit flush_on_reconnect
233        return _MemcacheClient(self.urls, flush_on_reconnect=True,
234                               **self._arguments)
235
236    def _destroy_connection(self, conn):
237        conn.disconnect_all()
238
239    def _get(self):
240        # super() cannot be used here because Queue in stdlib is an
241        # old-style class
242        conn = ConnectionPool._get(self)
243        try:
244            # Propagate host state known to us to this client's list
245            now = time.time()
246            for deaduntil, host in zip(self._hosts_deaduntil, conn.servers):
247                if deaduntil > now and host.deaduntil <= now:
248                    host.mark_dead('propagating death mark from the pool')
249                host.deaduntil = deaduntil
250        except Exception:
251            # We need to be sure that connection doesn't leak from the pool.
252            # This code runs before we enter context manager's try-finally
253            # block, so we need to explicitly release it here.
254            # super() cannot be used here because Queue in stdlib is an
255            # old-style class
256            ConnectionPool._put(self, conn)
257            raise
258        return conn
259
260    def _put(self, conn):
261        try:
262            # If this client found that one of the hosts is dead, mark it as
263            # such in our internal list
264            now = time.time()
265            for i, host in zip(itertools.count(), conn.servers):
266                deaduntil = self._hosts_deaduntil[i]
267                # Do nothing if we already know this host is dead
268                if deaduntil <= now:
269                    if host.deaduntil > now:
270                        self._hosts_deaduntil[i] = host.deaduntil
271                        self._debug_logger(
272                            'Marked host %s dead until %s',
273                            self.urls[i], host.deaduntil)
274                    else:
275                        self._hosts_deaduntil[i] = 0
276            # If all hosts are dead we should forget that they're dead. This
277            # way we won't get completely shut off until dead_retry seconds
278            # pass, but will be checking servers as frequent as we can (over
279            # way smaller socket_timeout)
280            if all(deaduntil > now for deaduntil in self._hosts_deaduntil):
281                self._debug_logger('All hosts are dead. Marking them as live.')
282                self._hosts_deaduntil[:] = [0] * len(self._hosts_deaduntil)
283        finally:
284            # super() cannot be used here because Queue in stdlib is an
285            # old-style class
286            ConnectionPool._put(self, conn)
287