1"""
2Implementation of `nbio_interface.AbstractIOServices` on top of a
3selector-based I/O loop, such as tornado's and our home-grown
4select_connection's I/O loops.
5
6"""
7import abc
8import logging
9import socket
10import threading
11
12from pika.adapters.utils import nbio_interface, io_services_utils
13from pika.adapters.utils.io_services_utils import (check_callback_arg,
14                                                   check_fd_arg)
15
16LOGGER = logging.getLogger(__name__)
17
18
19class AbstractSelectorIOLoop(object):
20    """Selector-based I/O loop interface expected by
21    `selector_ioloop_adapter.SelectorIOServicesAdapter`
22
23    NOTE: this interface follows the corresponding methods and attributes
24     of `tornado.ioloop.IOLoop` in order to avoid additional adapter layering
25     when wrapping tornado's IOLoop.
26    """
27
28    @property
29    @abc.abstractmethod
30    def READ(self):  # pylint: disable=C0103
31        """The value of the I/O loop's READ flag; READ/WRITE/ERROR may be used
32        with bitwise operators as expected.
33
34        Implementation note: the implementations can simply replace these
35        READ/WRITE/ERROR properties with class-level attributes
36
37        """
38
39    @property
40    @abc.abstractmethod
41    def WRITE(self):  # pylint: disable=C0103
42        """The value of the I/O loop's WRITE flag; READ/WRITE/ERROR may be used
43        with bitwise operators as expected
44
45        """
46
47    @property
48    @abc.abstractmethod
49    def ERROR(self):  # pylint: disable=C0103
50        """The value of the I/O loop's ERROR flag; READ/WRITE/ERROR may be used
51        with bitwise operators as expected
52
53        """
54
55    @abc.abstractmethod
56    def close(self):
57        """Release IOLoop's resources.
58
59        the `close()` method is intended to be called by the application or test
60        code only after `start()` returns. After calling `close()`, no other
61        interaction with the closed instance of `IOLoop` should be performed.
62
63        """
64
65    @abc.abstractmethod
66    def start(self):
67        """Run the I/O loop. It will loop until requested to exit. See `stop()`.
68
69        """
70
71    @abc.abstractmethod
72    def stop(self):
73        """Request exit from the ioloop. The loop is NOT guaranteed to
74        stop before this method returns.
75
76        To invoke `stop()` safely from a thread other than this IOLoop's thread,
77        call it via `add_callback_threadsafe`; e.g.,
78
79            `ioloop.add_callback(ioloop.stop)`
80
81        """
82
83    @abc.abstractmethod
84    def call_later(self, delay, callback):
85        """Add the callback to the IOLoop timer to be called after delay seconds
86        from the time of call on best-effort basis. Returns a handle to the
87        timeout.
88
89        :param float delay: The number of seconds to wait to call callback
90        :param callable callback: The callback method
91        :returns: handle to the created timeout that may be passed to
92            `remove_timeout()`
93        :rtype: object
94
95        """
96
97    @abc.abstractmethod
98    def remove_timeout(self, timeout_handle):
99        """Remove a timeout
100
101        :param timeout_handle: Handle of timeout to remove
102
103        """
104
105    @abc.abstractmethod
106    def add_callback(self, callback):
107        """Requests a call to the given function as soon as possible in the
108        context of this IOLoop's thread.
109
110        NOTE: This is the only thread-safe method in IOLoop. All other
111        manipulations of IOLoop must be performed from the IOLoop's thread.
112
113        For example, a thread may request a call to the `stop` method of an
114        ioloop that is running in a different thread via
115        `ioloop.add_callback_threadsafe(ioloop.stop)`
116
117        :param callable callback: The callback method
118
119        """
120
121    @abc.abstractmethod
122    def add_handler(self, fd, handler, events):
123        """Start watching the given file descriptor for events
124
125        :param int fd: The file descriptor
126        :param callable handler: When requested event(s) occur,
127            `handler(fd, events)` will be called.
128        :param int events: The event mask using READ, WRITE, ERROR.
129
130        """
131
132    @abc.abstractmethod
133    def update_handler(self, fd, events):
134        """Changes the events we watch for
135
136        :param int fd: The file descriptor
137        :param int events: The event mask using READ, WRITE, ERROR
138
139        """
140
141    @abc.abstractmethod
142    def remove_handler(self, fd):
143        """Stop watching the given file descriptor for events
144
145        :param int fd: The file descriptor
146
147        """
148
149
150class SelectorIOServicesAdapter(io_services_utils.SocketConnectionMixin,
151                                io_services_utils.StreamingConnectionMixin,
152                                nbio_interface.AbstractIOServices,
153                                nbio_interface.AbstractFileDescriptorServices):
154    """Implements the
155    :py:class:`.nbio_interface.AbstractIOServices` interface
156    on top of selector-style native loop having the
157    :py:class:`AbstractSelectorIOLoop` interface, such as
158    :py:class:`pika.selection_connection.IOLoop` and :py:class:`tornado.IOLoop`.
159
160    NOTE:
161    :py:class:`.nbio_interface.AbstractFileDescriptorServices`
162    interface is only required by the mixins.
163
164    """
165
166    def __init__(self, native_loop):
167        """
168        :param AbstractSelectorIOLoop native_loop: An instance compatible with
169            the `AbstractSelectorIOLoop` interface, but not necessarily derived
170            from it.
171        """
172        self._loop = native_loop
173
174        # Active watchers: maps file descriptors to `_FileDescriptorCallbacks`
175        self._watchers = dict()
176
177        # Native loop-specific event masks of interest
178        self._readable_mask = self._loop.READ
179        # NOTE: tying ERROR to WRITE is particularly handy for Windows, whose
180        # `select.select()` differs from Posix by reporting
181        # connection-establishment failure only through exceptfds (ERROR event),
182        # while the typical application workflow is to wait for the socket to
183        # become writable when waiting for socket connection to be established.
184        self._writable_mask = self._loop.WRITE | self._loop.ERROR
185
186    def get_native_ioloop(self):
187        """Implement
188        :py:meth:`.nbio_interface.AbstractIOServices.get_native_ioloop()`.
189
190        """
191        return self._loop
192
193    def close(self):
194        """Implement :py:meth:`.nbio_interface.AbstractIOServices.close()`.
195
196        """
197        self._loop.close()
198
199    def run(self):
200        """Implement :py:meth:`.nbio_interface.AbstractIOServices.run()`.
201
202        """
203        self._loop.start()
204
205    def stop(self):
206        """Implement :py:meth:`.nbio_interface.AbstractIOServices.stop()`.
207
208        """
209        self._loop.stop()
210
211    def add_callback_threadsafe(self, callback):
212        """Implement
213        :py:meth:`.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
214
215        """
216        self._loop.add_callback(callback)
217
218    def call_later(self, delay, callback):
219        """Implement :py:meth:`.nbio_interface.AbstractIOServices.call_later()`.
220
221        """
222        return _TimerHandle(self._loop.call_later(delay, callback), self._loop)
223
224    def getaddrinfo(self,
225                    host,
226                    port,
227                    on_done,
228                    family=0,
229                    socktype=0,
230                    proto=0,
231                    flags=0):
232        """Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
233
234        """
235        return _SelectorIOLoopIOHandle(
236            _AddressResolver(
237                native_loop=self._loop,
238                host=host,
239                port=port,
240                family=family,
241                socktype=socktype,
242                proto=proto,
243                flags=flags,
244                on_done=on_done).start())
245
246    def set_reader(self, fd, on_readable):
247        """Implement
248        :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
249
250        """
251        LOGGER.debug('SelectorIOServicesAdapter.set_reader(%s, %r)', fd,
252                     on_readable)
253
254        check_fd_arg(fd)
255        check_callback_arg(on_readable, 'on_readable')
256
257        try:
258            callbacks = self._watchers[fd]
259        except KeyError:
260            self._loop.add_handler(fd, self._on_reader_writer_fd_events,
261                                   self._readable_mask)
262            self._watchers[fd] = _FileDescriptorCallbacks(reader=on_readable)
263            LOGGER.debug('set_reader(%s, _) added handler Rd', fd)
264        else:
265            if callbacks.reader is None:
266                assert callbacks.writer is not None
267                self._loop.update_handler(
268                    fd, self._readable_mask | self._writable_mask)
269                LOGGER.debug('set_reader(%s, _) updated handler RdWr', fd)
270            else:
271                LOGGER.debug('set_reader(%s, _) replacing reader', fd)
272
273            callbacks.reader = on_readable
274
275    def remove_reader(self, fd):
276        """Implement
277        :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
278
279        """
280        LOGGER.debug('SelectorIOServicesAdapter.remove_reader(%s)', fd)
281
282        check_fd_arg(fd)
283
284        try:
285            callbacks = self._watchers[fd]
286        except KeyError:
287            LOGGER.debug('remove_reader(%s) neither was set', fd)
288            return False
289
290        if callbacks.reader is None:
291            assert callbacks.writer is not None
292            LOGGER.debug('remove_reader(%s) reader wasn\'t set Wr', fd)
293            return False
294
295        callbacks.reader = None
296
297        if callbacks.writer is None:
298            del self._watchers[fd]
299            self._loop.remove_handler(fd)
300            LOGGER.debug('remove_reader(%s) removed handler', fd)
301        else:
302            self._loop.update_handler(fd, self._writable_mask)
303            LOGGER.debug('remove_reader(%s) updated handler Wr', fd)
304
305        return True
306
307    def set_writer(self, fd, on_writable):
308        """Implement
309        :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
310
311        """
312        LOGGER.debug('SelectorIOServicesAdapter.set_writer(%s, %r)', fd,
313                     on_writable)
314
315        check_fd_arg(fd)
316        check_callback_arg(on_writable, 'on_writable')
317
318        try:
319            callbacks = self._watchers[fd]
320        except KeyError:
321            self._loop.add_handler(fd, self._on_reader_writer_fd_events,
322                                   self._writable_mask)
323            self._watchers[fd] = _FileDescriptorCallbacks(writer=on_writable)
324            LOGGER.debug('set_writer(%s, _) added handler Wr', fd)
325        else:
326            if callbacks.writer is None:
327                assert callbacks.reader is not None
328                self._loop.update_handler(
329                    fd, self._readable_mask | self._writable_mask)
330                LOGGER.debug('set_writer(%s, _) updated handler RdWr', fd)
331            else:
332                LOGGER.debug('set_writer(%s, _) replacing writer', fd)
333
334            callbacks.writer = on_writable
335
336    def remove_writer(self, fd):
337        """Implement
338        :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
339
340        """
341        LOGGER.debug('SelectorIOServicesAdapter.remove_writer(%s)', fd)
342
343        check_fd_arg(fd)
344
345        try:
346            callbacks = self._watchers[fd]
347        except KeyError:
348            LOGGER.debug('remove_writer(%s) neither was set.', fd)
349            return False
350
351        if callbacks.writer is None:
352            assert callbacks.reader is not None
353            LOGGER.debug('remove_writer(%s) writer wasn\'t set Rd', fd)
354            return False
355
356        callbacks.writer = None
357
358        if callbacks.reader is None:
359            del self._watchers[fd]
360            self._loop.remove_handler(fd)
361            LOGGER.debug('remove_writer(%s) removed handler', fd)
362        else:
363            self._loop.update_handler(fd, self._readable_mask)
364            LOGGER.debug('remove_writer(%s) updated handler Rd', fd)
365
366        return True
367
368    def _on_reader_writer_fd_events(self, fd, events):
369        """Handle indicated file descriptor events requested via `set_reader()`
370        and `set_writer()`.
371
372        :param fd: file descriptor
373        :param events: event mask using native loop's READ/WRITE/ERROR. NOTE:
374            depending on the underlying poller mechanism, ERROR may be indicated
375            upon certain file description state even though we don't request it.
376            We ignore ERROR here since `set_reader()`/`set_writer()` don't
377            request for it.
378        """
379        callbacks = self._watchers[fd]
380
381        if events & self._readable_mask and callbacks.reader is None:
382            # NOTE: we check for consistency here ahead of the writer callback
383            # because the writer callback, if any, can change the events being
384            # watched
385            LOGGER.warning(
386                'READ indicated on fd=%s, but reader callback is None; '
387                'events=%s', fd, bin(events))
388
389        if events & self._writable_mask:
390            if callbacks.writer is not None:
391                callbacks.writer()
392            else:
393                LOGGER.warning(
394                    'WRITE indicated on fd=%s, but writer callback is None; '
395                    'events=%s', fd, bin(events))
396
397        if events & self._readable_mask:
398            if callbacks.reader is not None:
399                callbacks.reader()
400            else:
401                # Reader callback might have been removed in the scope of writer
402                # callback.
403                pass
404
405
406class _FileDescriptorCallbacks(object):
407    """Holds reader and writer callbacks for a file descriptor"""
408
409    __slots__ = ('reader', 'writer')
410
411    def __init__(self, reader=None, writer=None):
412
413        self.reader = reader
414        self.writer = writer
415
416
417class _TimerHandle(nbio_interface.AbstractTimerReference):
418    """This module's adaptation of `nbio_interface.AbstractTimerReference`.
419
420    """
421
422    def __init__(self, handle, loop):
423        """
424
425        :param opaque handle: timer handle from the underlying loop
426            implementation that may be passed to its `remove_timeout()` method
427        :param AbstractSelectorIOLoop loop: the I/O loop instance that created
428            the timeout.
429        """
430        self._handle = handle
431        self._loop = loop
432
433    def cancel(self):
434        if self._loop is not None:
435            self._loop.remove_timeout(self._handle)
436            self._handle = None
437            self._loop = None
438
439
440class _SelectorIOLoopIOHandle(nbio_interface.AbstractIOReference):
441    """This module's adaptation of `nbio_interface.AbstractIOReference`
442
443    """
444
445    def __init__(self, subject):
446        """
447        :param subject: subject of the reference containing a `cancel()` method
448
449        """
450        self._cancel = subject.cancel
451
452    def cancel(self):
453        """Cancel pending operation
454
455        :returns: False if was already done or cancelled; True otherwise
456        :rtype: bool
457
458        """
459        return self._cancel()
460
461
462class _AddressResolver(object):
463    """Performs getaddrinfo asynchronously using a thread, then reports result
464    via callback from the given I/O loop.
465
466    NOTE: at this stage, we're using a thread per request, which may prove
467    inefficient and even prohibitive if the app performs many of these
468    operations concurrently.
469    """
470    NOT_STARTED = 0
471    ACTIVE = 1
472    CANCELED = 2
473    COMPLETED = 3
474
475    def __init__(self, native_loop, host, port, family, socktype, proto, flags,
476                 on_done):
477        """
478
479        :param AbstractSelectorIOLoop native_loop:
480        :param host: `see socket.getaddrinfo()`
481        :param port: `see socket.getaddrinfo()`
482        :param family: `see socket.getaddrinfo()`
483        :param socktype: `see socket.getaddrinfo()`
484        :param proto: `see socket.getaddrinfo()`
485        :param flags: `see socket.getaddrinfo()`
486        :param on_done: on_done(records|BaseException) callback for reporting
487            result from the given I/O loop. The single arg will be either an
488            exception object (check for `BaseException`) in case of failure or
489            the result returned by `socket.getaddrinfo()`.
490        """
491        check_callback_arg(on_done, 'on_done')
492
493        self._state = self.NOT_STARTED
494        self._result = None
495        self._loop = native_loop
496        self._host = host
497        self._port = port
498        self._family = family
499        self._socktype = socktype
500        self._proto = proto
501        self._flags = flags
502        self._on_done = on_done
503
504        self._mutex = threading.Lock()
505        self._threading_timer = None
506
507    def _cleanup(self):
508        """Release resources
509
510        """
511        self._loop = None
512        self._threading_timer = None
513        self._on_done = None
514
515    def start(self):
516        """Start asynchronous DNS lookup.
517
518        :rtype: nbio_interface.AbstractIOReference
519
520        """
521        assert self._state == self.NOT_STARTED, self._state
522
523        self._state = self.ACTIVE
524        self._threading_timer = threading.Timer(0, self._resolve)
525        self._threading_timer.start()
526
527        return _SelectorIOLoopIOHandle(self)
528
529    def cancel(self):
530        """Cancel the pending resolver
531
532        :returns: False if was already done or cancelled; True otherwise
533        :rtype: bool
534
535        """
536        # Try to cancel, but no guarantees
537        with self._mutex:
538            if self._state == self.ACTIVE:
539                LOGGER.debug('Canceling resolver for %s:%s', self._host,
540                             self._port)
541                self._state = self.CANCELED
542
543                # Attempt to cancel, but not guaranteed
544                self._threading_timer.cancel()
545
546                self._cleanup()
547
548                return True
549            else:
550                LOGGER.debug(
551                    'Ignoring _AddressResolver cancel request when not ACTIVE; '
552                    '(%s:%s); state=%s', self._host, self._port, self._state)
553                return False
554
555    def _resolve(self):
556        """Call `socket.getaddrinfo()` and return result via user's callback
557        function on the given I/O loop
558
559        """
560        try:
561            # NOTE: on python 2.x, can't pass keyword args to getaddrinfo()
562            result = socket.getaddrinfo(self._host, self._port, self._family,
563                                        self._socktype, self._proto,
564                                        self._flags)
565        except Exception as exc:  # pylint: disable=W0703
566            LOGGER.error('Address resolution failed: %r', exc)
567            result = exc
568
569        self._result = result
570
571        # Schedule result to be returned to user via user's event loop
572        with self._mutex:
573            if self._state == self.ACTIVE:
574                self._loop.add_callback(self._dispatch_result)
575            else:
576                LOGGER.debug(
577                    'Asynchronous getaddrinfo cancellation detected; '
578                    'in thread; host=%r', self._host)
579
580    def _dispatch_result(self):
581        """This is called from the user's I/O loop to pass the result to the
582         user via the user's on_done callback
583
584        """
585        if self._state == self.ACTIVE:
586            self._state = self.COMPLETED
587            try:
588                LOGGER.debug(
589                    'Invoking asynchronous getaddrinfo() completion callback; '
590                    'host=%r', self._host)
591                self._on_done(self._result)
592            finally:
593                self._cleanup()
594        else:
595            LOGGER.debug(
596                'Asynchronous getaddrinfo cancellation detected; '
597                'in I/O loop context; host=%r', self._host)
598