1"""A threading based handler.
2
3The :class:`SequentialThreadingHandler` is intended for regular Python
4environments that use threads.
5
6.. warning::
7
8    Do not use :class:`SequentialThreadingHandler` with applications
9    using asynchronous event loops (like gevent). Use the
10    :class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead.
11
12"""
13from __future__ import absolute_import
14
15from collections import defaultdict
16import errno
17from itertools import chain
18import logging
19import select
20import socket
21import threading
22import time
23
24import six
25
26import kazoo.python2atexit as python2atexit
27from kazoo.handlers import utils
28
29try:
30    import Queue
31except ImportError:  # pragma: nocover
32    import queue as Queue
33
34
35# sentinel objects
36_STOP = object()
37
38log = logging.getLogger(__name__)
39
40_HAS_EPOLL = hasattr(select, "epoll")
41
42
43def _to_fileno(obj):
44    if isinstance(obj, six.integer_types):
45        fd = int(obj)
46    elif hasattr(obj, "fileno"):
47        fd = obj.fileno()
48        if not isinstance(fd, six.integer_types):
49            raise TypeError("fileno() returned a non-integer")
50        fd = int(fd)
51    else:
52        raise TypeError("argument must be an int, or have a fileno() method.")
53
54    if fd < 0:
55        raise ValueError(
56            "file descriptor cannot be a negative integer (%d)" % (fd,)
57        )
58
59    return fd
60
61
62class KazooTimeoutError(Exception):
63    pass
64
65
66class AsyncResult(utils.AsyncResult):
67    """A one-time event that stores a value or an exception"""
68    def __init__(self, handler):
69        super(AsyncResult, self).__init__(handler,
70                                          threading.Condition,
71                                          KazooTimeoutError)
72
73
74class SequentialThreadingHandler(object):
75    """Threading handler for sequentially executing callbacks.
76
77    This handler executes callbacks in a sequential manner. A queue is
78    created for each of the callback events, so that each type of event
79    has its callback type run sequentially. These are split into two
80    queues, one for watch events and one for async result completion
81    callbacks.
82
83    Each queue type has a thread worker that pulls the callback event
84    off the queue and runs it in the order the client sees it.
85
86    This split helps ensure that watch callbacks won't block session
87    re-establishment should the connection be lost during a Zookeeper
88    client call.
89
90    Watch and completion callbacks should avoid blocking behavior as
91    the next callback of that type won't be run until it completes. If
92    you need to block, spawn a new thread and return immediately so
93    callbacks can proceed.
94
95    .. note::
96
97        Completion callbacks can block to wait on Zookeeper calls, but
98        no other completion callbacks will execute until the callback
99        returns.
100
101    """
102    name = "sequential_threading_handler"
103    timeout_exception = KazooTimeoutError
104    sleep_func = staticmethod(time.sleep)
105    queue_impl = Queue.Queue
106    queue_empty = Queue.Empty
107
108    def __init__(self):
109        """Create a :class:`SequentialThreadingHandler` instance"""
110        self.callback_queue = self.queue_impl()
111        self.completion_queue = self.queue_impl()
112        self._running = False
113        self._state_change = threading.Lock()
114        self._workers = []
115
116    def _create_thread_worker(self, queue):
117        def _thread_worker():  # pragma: nocover
118            while True:
119                try:
120                    func = queue.get()
121                    try:
122                        if func is _STOP:
123                            break
124                        func()
125                    except Exception:
126                        log.exception("Exception in worker queue thread")
127                    finally:
128                        queue.task_done()
129                except self.queue_empty:
130                    continue
131        t = self.spawn(_thread_worker)
132        return t
133
134    def start(self):
135        """Start the worker threads."""
136        with self._state_change:
137            if self._running:
138                return
139
140            # Spawn our worker threads, we have
141            # - A callback worker for watch events to be called
142            # - A completion worker for completion events to be called
143            for queue in (self.completion_queue, self.callback_queue):
144                w = self._create_thread_worker(queue)
145                self._workers.append(w)
146            self._running = True
147            python2atexit.register(self.stop)
148
149    def stop(self):
150        """Stop the worker threads and empty all queues."""
151        with self._state_change:
152            if not self._running:
153                return
154
155            self._running = False
156
157            for queue in (self.completion_queue, self.callback_queue):
158                queue.put(_STOP)
159
160            self._workers.reverse()
161            while self._workers:
162                worker = self._workers.pop()
163                worker.join()
164
165            # Clear the queues
166            self.callback_queue = self.queue_impl()
167            self.completion_queue = self.queue_impl()
168            python2atexit.unregister(self.stop)
169
170    def select(self, *args, **kwargs):
171        # if we have epoll, and select is not expected to work
172        # use an epoll-based "select". Otherwise don't touch
173        # anything to minimize changes
174        if _HAS_EPOLL:
175            # if the highest fd we've seen is > 1023
176            if max(map(_to_fileno, chain(*args[:3]))) > 1023:
177                return self._epoll_select(*args, **kwargs)
178        return self._select(*args, **kwargs)
179
180    def _select(self, *args, **kwargs):
181        timeout = kwargs.pop('timeout', None)
182        # either the time to give up, or None
183        end = (time.time() + timeout) if timeout else None
184        while end is None or time.time() < end:
185            if end is not None:
186                # make a list, since tuples aren't mutable
187                args = list(args)
188
189                # set the timeout to the remaining time
190                args[3] = end - time.time()
191            try:
192                return select.select(*args, **kwargs)
193            except select.error as ex:
194                # if the system call was interrupted, we'll retry until timeout
195                # in Python 3, system call interruptions are a native exception
196                # in Python 2, they are not
197                errnum = ex.errno if isinstance(ex, OSError) else ex[0]
198                if errnum == errno.EINTR:
199                    continue
200                raise
201        # if we hit our timeout, lets return as a timeout
202        return ([], [], [])
203
204    def _epoll_select(self, rlist, wlist, xlist, timeout=None):
205        """epoll-based drop-in replacement for select to overcome select
206        limitation on a maximum filehandle value
207        """
208        if timeout is None:
209            timeout = -1
210        eventmasks = defaultdict(int)
211        rfd2obj = defaultdict(list)
212        wfd2obj = defaultdict(list)
213        xfd2obj = defaultdict(list)
214        read_evmask = select.EPOLLIN | select.EPOLLPRI  # Just in case
215
216        def store_evmasks(obj_list, evmask, fd2obj):
217            for obj in obj_list:
218                fileno = _to_fileno(obj)
219                eventmasks[fileno] |= evmask
220                fd2obj[fileno].append(obj)
221
222        store_evmasks(rlist, read_evmask, rfd2obj)
223        store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
224        store_evmasks(xlist, select.EPOLLERR, xfd2obj)
225
226        poller = select.epoll()
227
228        for fileno in eventmasks:
229            poller.register(fileno, eventmasks[fileno])
230
231        try:
232            events = poller.poll(timeout)
233            revents = []
234            wevents = []
235            xevents = []
236            for fileno, event in events:
237                if event & read_evmask:
238                    revents += rfd2obj.get(fileno, [])
239                if event & select.EPOLLOUT:
240                    wevents += wfd2obj.get(fileno, [])
241                if event & select.EPOLLERR:
242                    xevents += xfd2obj.get(fileno, [])
243        finally:
244            poller.close()
245
246        return revents, wevents, xevents
247
248    def socket(self):
249        return utils.create_tcp_socket(socket)
250
251    def create_connection(self, *args, **kwargs):
252        return utils.create_tcp_connection(socket, *args, **kwargs)
253
254    def create_socket_pair(self):
255        return utils.create_socket_pair(socket)
256
257    def event_object(self):
258        """Create an appropriate Event object"""
259        return threading.Event()
260
261    def lock_object(self):
262        """Create a lock object"""
263        return threading.Lock()
264
265    def rlock_object(self):
266        """Create an appropriate RLock object"""
267        return threading.RLock()
268
269    def async_result(self):
270        """Create a :class:`AsyncResult` instance"""
271        return AsyncResult(self)
272
273    def spawn(self, func, *args, **kwargs):
274        t = threading.Thread(target=func, args=args, kwargs=kwargs)
275        t.daemon = True
276        t.start()
277        return t
278
279    def dispatch_callback(self, callback):
280        """Dispatch to the callback object
281
282        The callback is put on separate queues to run depending on the
283        type as documented for the :class:`SequentialThreadingHandler`.
284
285        """
286        self.callback_queue.put(lambda: callback.func(*callback.args))
287