1"""A threading based handler. 2 3The :class:`SequentialThreadingHandler` is intended for regular Python 4environments that use threads. 5 6.. warning:: 7 8 Do not use :class:`SequentialThreadingHandler` with applications 9 using asynchronous event loops (like gevent). Use the 10 :class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead. 11 12""" 13from __future__ import absolute_import 14 15from collections import defaultdict 16import errno 17from itertools import chain 18import logging 19import select 20import socket 21import threading 22import time 23 24import six 25 26import kazoo.python2atexit as python2atexit 27from kazoo.handlers import utils 28 29try: 30 import Queue 31except ImportError: # pragma: nocover 32 import queue as Queue 33 34 35# sentinel objects 36_STOP = object() 37 38log = logging.getLogger(__name__) 39 40_HAS_EPOLL = hasattr(select, "epoll") 41 42 43def _to_fileno(obj): 44 if isinstance(obj, six.integer_types): 45 fd = int(obj) 46 elif hasattr(obj, "fileno"): 47 fd = obj.fileno() 48 if not isinstance(fd, six.integer_types): 49 raise TypeError("fileno() returned a non-integer") 50 fd = int(fd) 51 else: 52 raise TypeError("argument must be an int, or have a fileno() method.") 53 54 if fd < 0: 55 raise ValueError( 56 "file descriptor cannot be a negative integer (%d)" % (fd,) 57 ) 58 59 return fd 60 61 62class KazooTimeoutError(Exception): 63 pass 64 65 66class AsyncResult(utils.AsyncResult): 67 """A one-time event that stores a value or an exception""" 68 def __init__(self, handler): 69 super(AsyncResult, self).__init__(handler, 70 threading.Condition, 71 KazooTimeoutError) 72 73 74class SequentialThreadingHandler(object): 75 """Threading handler for sequentially executing callbacks. 76 77 This handler executes callbacks in a sequential manner. A queue is 78 created for each of the callback events, so that each type of event 79 has its callback type run sequentially. These are split into two 80 queues, one for watch events and one for async result completion 81 callbacks. 82 83 Each queue type has a thread worker that pulls the callback event 84 off the queue and runs it in the order the client sees it. 85 86 This split helps ensure that watch callbacks won't block session 87 re-establishment should the connection be lost during a Zookeeper 88 client call. 89 90 Watch and completion callbacks should avoid blocking behavior as 91 the next callback of that type won't be run until it completes. If 92 you need to block, spawn a new thread and return immediately so 93 callbacks can proceed. 94 95 .. note:: 96 97 Completion callbacks can block to wait on Zookeeper calls, but 98 no other completion callbacks will execute until the callback 99 returns. 100 101 """ 102 name = "sequential_threading_handler" 103 timeout_exception = KazooTimeoutError 104 sleep_func = staticmethod(time.sleep) 105 queue_impl = Queue.Queue 106 queue_empty = Queue.Empty 107 108 def __init__(self): 109 """Create a :class:`SequentialThreadingHandler` instance""" 110 self.callback_queue = self.queue_impl() 111 self.completion_queue = self.queue_impl() 112 self._running = False 113 self._state_change = threading.Lock() 114 self._workers = [] 115 116 def _create_thread_worker(self, queue): 117 def _thread_worker(): # pragma: nocover 118 while True: 119 try: 120 func = queue.get() 121 try: 122 if func is _STOP: 123 break 124 func() 125 except Exception: 126 log.exception("Exception in worker queue thread") 127 finally: 128 queue.task_done() 129 except self.queue_empty: 130 continue 131 t = self.spawn(_thread_worker) 132 return t 133 134 def start(self): 135 """Start the worker threads.""" 136 with self._state_change: 137 if self._running: 138 return 139 140 # Spawn our worker threads, we have 141 # - A callback worker for watch events to be called 142 # - A completion worker for completion events to be called 143 for queue in (self.completion_queue, self.callback_queue): 144 w = self._create_thread_worker(queue) 145 self._workers.append(w) 146 self._running = True 147 python2atexit.register(self.stop) 148 149 def stop(self): 150 """Stop the worker threads and empty all queues.""" 151 with self._state_change: 152 if not self._running: 153 return 154 155 self._running = False 156 157 for queue in (self.completion_queue, self.callback_queue): 158 queue.put(_STOP) 159 160 self._workers.reverse() 161 while self._workers: 162 worker = self._workers.pop() 163 worker.join() 164 165 # Clear the queues 166 self.callback_queue = self.queue_impl() 167 self.completion_queue = self.queue_impl() 168 python2atexit.unregister(self.stop) 169 170 def select(self, *args, **kwargs): 171 # if we have epoll, and select is not expected to work 172 # use an epoll-based "select". Otherwise don't touch 173 # anything to minimize changes 174 if _HAS_EPOLL: 175 # if the highest fd we've seen is > 1023 176 if max(map(_to_fileno, chain(*args[:3]))) > 1023: 177 return self._epoll_select(*args, **kwargs) 178 return self._select(*args, **kwargs) 179 180 def _select(self, *args, **kwargs): 181 timeout = kwargs.pop('timeout', None) 182 # either the time to give up, or None 183 end = (time.time() + timeout) if timeout else None 184 while end is None or time.time() < end: 185 if end is not None: 186 # make a list, since tuples aren't mutable 187 args = list(args) 188 189 # set the timeout to the remaining time 190 args[3] = end - time.time() 191 try: 192 return select.select(*args, **kwargs) 193 except select.error as ex: 194 # if the system call was interrupted, we'll retry until timeout 195 # in Python 3, system call interruptions are a native exception 196 # in Python 2, they are not 197 errnum = ex.errno if isinstance(ex, OSError) else ex[0] 198 if errnum == errno.EINTR: 199 continue 200 raise 201 # if we hit our timeout, lets return as a timeout 202 return ([], [], []) 203 204 def _epoll_select(self, rlist, wlist, xlist, timeout=None): 205 """epoll-based drop-in replacement for select to overcome select 206 limitation on a maximum filehandle value 207 """ 208 if timeout is None: 209 timeout = -1 210 eventmasks = defaultdict(int) 211 rfd2obj = defaultdict(list) 212 wfd2obj = defaultdict(list) 213 xfd2obj = defaultdict(list) 214 read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case 215 216 def store_evmasks(obj_list, evmask, fd2obj): 217 for obj in obj_list: 218 fileno = _to_fileno(obj) 219 eventmasks[fileno] |= evmask 220 fd2obj[fileno].append(obj) 221 222 store_evmasks(rlist, read_evmask, rfd2obj) 223 store_evmasks(wlist, select.EPOLLOUT, wfd2obj) 224 store_evmasks(xlist, select.EPOLLERR, xfd2obj) 225 226 poller = select.epoll() 227 228 for fileno in eventmasks: 229 poller.register(fileno, eventmasks[fileno]) 230 231 try: 232 events = poller.poll(timeout) 233 revents = [] 234 wevents = [] 235 xevents = [] 236 for fileno, event in events: 237 if event & read_evmask: 238 revents += rfd2obj.get(fileno, []) 239 if event & select.EPOLLOUT: 240 wevents += wfd2obj.get(fileno, []) 241 if event & select.EPOLLERR: 242 xevents += xfd2obj.get(fileno, []) 243 finally: 244 poller.close() 245 246 return revents, wevents, xevents 247 248 def socket(self): 249 return utils.create_tcp_socket(socket) 250 251 def create_connection(self, *args, **kwargs): 252 return utils.create_tcp_connection(socket, *args, **kwargs) 253 254 def create_socket_pair(self): 255 return utils.create_socket_pair(socket) 256 257 def event_object(self): 258 """Create an appropriate Event object""" 259 return threading.Event() 260 261 def lock_object(self): 262 """Create a lock object""" 263 return threading.Lock() 264 265 def rlock_object(self): 266 """Create an appropriate RLock object""" 267 return threading.RLock() 268 269 def async_result(self): 270 """Create a :class:`AsyncResult` instance""" 271 return AsyncResult(self) 272 273 def spawn(self, func, *args, **kwargs): 274 t = threading.Thread(target=func, args=args, kwargs=kwargs) 275 t.daemon = True 276 t.start() 277 return t 278 279 def dispatch_callback(self, callback): 280 """Dispatch to the callback object 281 282 The callback is put on separate queues to run depending on the 283 type as documented for the :class:`SequentialThreadingHandler`. 284 285 """ 286 self.callback_queue.put(lambda: callback.func(*callback.args)) 287