1# Copyright 2014 Mirantis Inc 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16"""Thread-safe connection pool for python-memcached.""" 17 18import collections 19import contextlib 20import itertools 21import queue 22import threading 23import time 24 25try: 26 import eventlet 27except ImportError: 28 eventlet = None 29import memcache 30from oslo_log import log 31 32from oslo_cache._i18n import _ 33from oslo_cache import exception 34 35 36LOG = log.getLogger(__name__) 37 38 39class _MemcacheClient(memcache.Client): 40 """Thread global memcache client 41 42 As client is inherited from threading.local we have to restore object 43 methods overloaded by threading.local so we can reuse clients in 44 different threads 45 """ 46 __delattr__ = object.__delattr__ 47 __getattribute__ = object.__getattribute__ 48 __setattr__ = object.__setattr__ 49 50 # Hack for lp 1812935 51 if eventlet and eventlet.patcher.is_monkey_patched('thread'): 52 # NOTE(bnemec): I'm not entirely sure why this works in a 53 # monkey-patched environment and not with vanilla stdlib, but it does. 54 def __new__(cls, *args, **kwargs): 55 return object.__new__(cls) 56 else: 57 __new__ = object.__new__ 58 59 def __del__(self): 60 pass 61 62 63_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection']) 64 65 66class ConnectionPool(queue.Queue): 67 """Base connection pool class 68 69 This class implements the basic connection pool logic as an abstract base 70 class. 71 """ 72 def __init__(self, maxsize, unused_timeout, conn_get_timeout=None): 73 """Initialize the connection pool. 74 75 :param maxsize: maximum number of client connections for the pool 76 :type maxsize: int 77 :param unused_timeout: idle time to live for unused clients (in 78 seconds). If a client connection object has been 79 in the pool and idle for longer than the 80 unused_timeout, it will be reaped. This is to 81 ensure resources are released as utilization 82 goes down. 83 :type unused_timeout: int 84 :param conn_get_timeout: maximum time in seconds to wait for a 85 connection. If set to `None` timeout is 86 indefinite. 87 :type conn_get_timeout: int 88 """ 89 # super() cannot be used here because Queue in stdlib is an 90 # old-style class 91 queue.Queue.__init__(self, maxsize) 92 self._unused_timeout = unused_timeout 93 self._connection_get_timeout = conn_get_timeout 94 self._acquired = 0 95 96 def _create_connection(self): 97 """Returns a connection instance. 98 99 This is called when the pool needs another instance created. 100 101 :returns: a new connection instance 102 103 """ 104 raise NotImplementedError 105 106 def _destroy_connection(self, conn): 107 """Destroy and cleanup a connection instance. 108 109 This is called when the pool wishes to get rid of an existing 110 connection. This is the opportunity for a subclass to free up 111 resources and cleanup after itself. 112 113 :param conn: the connection object to destroy 114 115 """ 116 raise NotImplementedError 117 118 def _do_log(self, level, msg, *args, **kwargs): 119 if LOG.isEnabledFor(level): 120 thread_id = threading.current_thread().ident 121 args = (id(self), thread_id) + args 122 prefix = 'Memcached pool %s, thread %s: ' 123 LOG.log(level, prefix + msg, *args, **kwargs) 124 125 def _debug_logger(self, msg, *args, **kwargs): 126 self._do_log(log.DEBUG, msg, *args, **kwargs) 127 128 def _trace_logger(self, msg, *args, **kwargs): 129 self._do_log(log.TRACE, msg, *args, **kwargs) 130 131 @contextlib.contextmanager 132 def acquire(self): 133 self._trace_logger('Acquiring connection') 134 self._drop_expired_connections() 135 try: 136 conn = self.get(timeout=self._connection_get_timeout) 137 except queue.Empty: 138 raise exception.QueueEmpty( 139 _('Unable to get a connection from pool id %(id)s after ' 140 '%(seconds)s seconds.') % 141 {'id': id(self), 'seconds': self._connection_get_timeout}) 142 self._trace_logger('Acquired connection %s', id(conn)) 143 try: 144 yield conn 145 finally: 146 self._trace_logger('Releasing connection %s', id(conn)) 147 try: 148 # super() cannot be used here because Queue in stdlib is an 149 # old-style class 150 queue.Queue.put(self, conn, block=False) 151 except queue.Full: 152 self._trace_logger('Reaping exceeding connection %s', id(conn)) 153 self._destroy_connection(conn) 154 155 def _qsize(self): 156 if self.maxsize: 157 return self.maxsize - self._acquired 158 else: 159 # A value indicating there is always a free connection 160 # if maxsize is None or 0 161 return 1 162 163 # NOTE(dstanek): stdlib and eventlet Queue implementations 164 # have different names for the qsize method. This ensures 165 # that we override both of them. 166 if not hasattr(queue.Queue, '_qsize'): 167 qsize = _qsize 168 169 def _get(self): 170 try: 171 conn = self.queue.pop().connection 172 except IndexError: 173 conn = self._create_connection() 174 self._acquired += 1 175 return conn 176 177 def _drop_expired_connections(self): 178 """Drop all expired connections from the left end of the queue.""" 179 now = time.time() 180 try: 181 while self.queue[0].ttl < now: 182 conn = self.queue.popleft().connection 183 self._trace_logger('Reaping connection %s', id(conn)) 184 self._destroy_connection(conn) 185 except IndexError: 186 # NOTE(amakarov): This is an expected excepton. so there's no 187 # need to react. We have to handle exceptions instead of 188 # checking queue length as IndexError is a result of race 189 # condition too as well as of mere queue depletio of mere queue 190 # depletionn. 191 pass 192 193 def _put(self, conn): 194 self.queue.append(_PoolItem( 195 ttl=time.time() + self._unused_timeout, 196 connection=conn, 197 )) 198 self._acquired -= 1 199 200 201class MemcacheClientPool(ConnectionPool): 202 def __init__(self, urls, arguments, **kwargs): 203 # super() cannot be used here because Queue in stdlib is an 204 # old-style class 205 ConnectionPool.__init__(self, **kwargs) 206 self.urls = urls 207 self._arguments = arguments 208 # NOTE(morganfainberg): The host objects expect an int for the 209 # deaduntil value. Initialize this at 0 for each host with 0 indicating 210 # the host is not dead. 211 self._hosts_deaduntil = [0] * len(urls) 212 213 def _create_connection(self): 214 # NOTE(morgan): Explicitly set flush_on_reconnect for pooled 215 # connections. This should ensure that stale data is never consumed 216 # from a server that pops in/out due to a network partition 217 # or disconnect. 218 # 219 # See the help from python-memcached: 220 # 221 # param flush_on_reconnect: optional flag which prevents a 222 # scenario that can cause stale data to be read: If there's more 223 # than one memcached server and the connection to one is 224 # interrupted, keys that mapped to that server will get 225 # reassigned to another. If the first server comes back, those 226 # keys will map to it again. If it still has its data, get()s 227 # can read stale data that was overwritten on another 228 # server. This flag is off by default for backwards 229 # compatibility. 230 # 231 # The normal non-pooled clients connect explicitly on each use and 232 # does not need the explicit flush_on_reconnect 233 return _MemcacheClient(self.urls, flush_on_reconnect=True, 234 **self._arguments) 235 236 def _destroy_connection(self, conn): 237 conn.disconnect_all() 238 239 def _get(self): 240 # super() cannot be used here because Queue in stdlib is an 241 # old-style class 242 conn = ConnectionPool._get(self) 243 try: 244 # Propagate host state known to us to this client's list 245 now = time.time() 246 for deaduntil, host in zip(self._hosts_deaduntil, conn.servers): 247 if deaduntil > now and host.deaduntil <= now: 248 host.mark_dead('propagating death mark from the pool') 249 host.deaduntil = deaduntil 250 except Exception: 251 # We need to be sure that connection doesn't leak from the pool. 252 # This code runs before we enter context manager's try-finally 253 # block, so we need to explicitly release it here. 254 # super() cannot be used here because Queue in stdlib is an 255 # old-style class 256 ConnectionPool._put(self, conn) 257 raise 258 return conn 259 260 def _put(self, conn): 261 try: 262 # If this client found that one of the hosts is dead, mark it as 263 # such in our internal list 264 now = time.time() 265 for i, host in zip(itertools.count(), conn.servers): 266 deaduntil = self._hosts_deaduntil[i] 267 # Do nothing if we already know this host is dead 268 if deaduntil <= now: 269 if host.deaduntil > now: 270 self._hosts_deaduntil[i] = host.deaduntil 271 self._debug_logger( 272 'Marked host %s dead until %s', 273 self.urls[i], host.deaduntil) 274 else: 275 self._hosts_deaduntil[i] = 0 276 # If all hosts are dead we should forget that they're dead. This 277 # way we won't get completely shut off until dead_retry seconds 278 # pass, but will be checking servers as frequent as we can (over 279 # way smaller socket_timeout) 280 if all(deaduntil > now for deaduntil in self._hosts_deaduntil): 281 self._debug_logger('All hosts are dead. Marking them as live.') 282 self._hosts_deaduntil[:] = [0] * len(self._hosts_deaduntil) 283 finally: 284 # super() cannot be used here because Queue in stdlib is an 285 # old-style class 286 ConnectionPool._put(self, conn) 287