1"""Utilities for implementing `nbio_interface.AbstractIOServices` for
2pika connection adapters.
3
4"""
5
6import collections
7import errno
8import functools
9import logging
10import numbers
11import os
12import socket
13import ssl
14import sys
15import traceback
16
17from pika.adapters.utils.nbio_interface import (AbstractIOReference,
18                                                AbstractStreamTransport)
19import pika.compat
20import pika.diagnostic_utils
21
22# "Try again" error codes for non-blocking socket I/O - send()/recv().
23# NOTE: POSIX.1 allows either error to be returned for this case and doesn't require
24# them to have the same value.
25_TRY_IO_AGAIN_SOCK_ERROR_CODES = (
26    errno.EAGAIN,
27    errno.EWOULDBLOCK,
28)
29
30# "Connection establishment pending" error codes for non-blocking socket
31# connect() call.
32# NOTE: EINPROGRESS for Posix and EWOULDBLOCK for Windows
33_CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES = (
34    errno.EINPROGRESS,
35    errno.EWOULDBLOCK,
36)
37
38_LOGGER = logging.getLogger(__name__)
39
40# Decorator that logs exceptions escaping from the decorated function
41_log_exceptions = pika.diagnostic_utils.create_log_exception_decorator(_LOGGER)  # pylint: disable=C0103
42
43
44def check_callback_arg(callback, name):
45    """Raise TypeError if callback is not callable
46
47    :param callback: callback to check
48    :param name: Name to include in exception text
49    :raises TypeError:
50
51    """
52    if not callable(callback):
53        raise TypeError('{} must be callable, but got {!r}'.format(
54            name, callback))
55
56
57def check_fd_arg(fd):
58    """Raise TypeError if file descriptor is not an integer
59
60    :param fd: file descriptor
61    :raises TypeError:
62
63    """
64    if not isinstance(fd, numbers.Integral):
65        raise TypeError(
66            'Paramter must be a file descriptor, but got {!r}'.format(fd))
67
68
69def _retry_on_sigint(func):
70    """Function decorator for retrying on SIGINT.
71
72    """
73
74    @functools.wraps(func)
75    def retry_sigint_wrap(*args, **kwargs):
76        """Wrapper for decorated function"""
77        while True:
78            try:
79                return func(*args, **kwargs)
80            except pika.compat.SOCKET_ERROR as error:
81                if error.errno == errno.EINTR:
82                    continue
83                else:
84                    raise
85
86    return retry_sigint_wrap
87
88
89class SocketConnectionMixin(object):
90    """Implements
91    `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
92    on top of
93    `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
94    basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.
95
96    """
97
98    def connect_socket(self, sock, resolved_addr, on_done):
99        """Implement
100        :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.
101
102        """
103        return _AsyncSocketConnector(
104            nbio=self, sock=sock, resolved_addr=resolved_addr,
105            on_done=on_done).start()
106
107
108class StreamingConnectionMixin(object):
109    """Implements
110    `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
111    top of `.nbio_interface.AbstractFileDescriptorServices` and basic
112    `nbio_interface.AbstractIOServices` services.
113
114    """
115
116    def create_streaming_connection(self,
117                                    protocol_factory,
118                                    sock,
119                                    on_done,
120                                    ssl_context=None,
121                                    server_hostname=None):
122        """Implement
123        :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.
124
125        """
126        try:
127            return _AsyncStreamConnector(
128                nbio=self,
129                protocol_factory=protocol_factory,
130                sock=sock,
131                ssl_context=ssl_context,
132                server_hostname=server_hostname,
133                on_done=on_done).start()
134        except Exception as error:
135            _LOGGER.error('create_streaming_connection(%s) failed: %r', sock,
136                          error)
137            # Close the socket since this function takes ownership
138            try:
139                sock.close()
140            except Exception as error:  # pylint: disable=W0703
141                # We log and suppress the exception from sock.close() so that
142                # the original error from _AsyncStreamConnector constructor will
143                # percolate
144                _LOGGER.error('%s.close() failed: %r', sock, error)
145
146            raise
147
148
149class _AsyncServiceAsyncHandle(AbstractIOReference):
150    """This module's adaptation of `.nbio_interface.AbstractIOReference`
151
152    """
153
154    def __init__(self, subject):
155        """
156        :param subject: subject of the reference containing a `cancel()` method
157
158        """
159        self._cancel = subject.cancel
160
161    def cancel(self):
162        """Cancel pending operation
163
164        :returns: False if was already done or cancelled; True otherwise
165        :rtype: bool
166
167        """
168        return self._cancel()
169
170
171class _AsyncSocketConnector(object):
172    """Connects the given non-blocking socket asynchronously using
173    `.nbio_interface.AbstractFileDescriptorServices` and basic
174    `.nbio_interface.AbstractIOServices`. Used for implementing
175    `.nbio_interface.AbstractIOServices.connect_socket()`.
176    """
177
178    _STATE_NOT_STARTED = 0  # start() not called yet
179    _STATE_ACTIVE = 1  # workflow started
180    _STATE_CANCELED = 2  # workflow aborted by user's cancel() call
181    _STATE_COMPLETED = 3  # workflow completed: succeeded or failed
182
183    def __init__(self, nbio, sock, resolved_addr, on_done):
184        """
185        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
186        :param socket.socket sock: non-blocking socket that needs to be
187            connected via `socket.socket.connect()`
188        :param tuple resolved_addr: resolved destination address/port two-tuple
189            which is compatible with the given's socket's address family
190        :param callable on_done: user callback that takes None upon successful
191            completion or exception upon error (check for `BaseException`) as
192            its only arg. It will not be called if the operation was cancelled.
193        :raises ValueError: if host portion of `resolved_addr` is not an IP
194            address or is inconsistent with the socket's address family as
195            validated via `socket.inet_pton()`
196        """
197        check_callback_arg(on_done, 'on_done')
198
199        try:
200            socket.inet_pton(sock.family, resolved_addr[0])
201        except Exception as error:  # pylint: disable=W0703
202            if not hasattr(socket, 'inet_pton'):
203                _LOGGER.debug(
204                    'Unable to check resolved address: no socket.inet_pton().')
205            else:
206                msg = ('Invalid or unresolved IP address '
207                       '{!r} for socket {}: {!r}').format(
208                           resolved_addr, sock, error)
209                _LOGGER.error(msg)
210                raise ValueError(msg)
211
212        self._nbio = nbio
213        self._sock = sock
214        self._addr = resolved_addr
215        self._on_done = on_done
216        self._state = self._STATE_NOT_STARTED
217        self._watching_socket_events = False
218
219    @_log_exceptions
220    def _cleanup(self):
221        """Remove socket watcher, if any
222
223        """
224        if self._watching_socket_events:
225            self._watching_socket_events = False
226            self._nbio.remove_writer(self._sock.fileno())
227
228    def start(self):
229        """Start asynchronous connection establishment.
230
231        :rtype: AbstractIOReference
232        """
233        assert self._state == self._STATE_NOT_STARTED, (
234            '_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED',
235            self._state)
236
237        self._state = self._STATE_ACTIVE
238
239        # Continue the rest of the operation on the I/O loop to avoid calling
240        # user's completion callback from the scope of user's call
241        self._nbio.add_callback_threadsafe(self._start_async)
242
243        return _AsyncServiceAsyncHandle(self)
244
245    def cancel(self):
246        """Cancel pending connection request without calling user's completion
247        callback.
248
249        :returns: False if was already done or cancelled; True otherwise
250        :rtype: bool
251
252        """
253        if self._state == self._STATE_ACTIVE:
254            self._state = self._STATE_CANCELED
255            _LOGGER.debug('User canceled connection request for %s to %s',
256                          self._sock, self._addr)
257            self._cleanup()
258            return True
259
260        _LOGGER.debug(
261            '_AsyncSocketConnector cancel requested when not ACTIVE: '
262            'state=%s; %s', self._state, self._sock)
263        return False
264
265    @_log_exceptions
266    def _report_completion(self, result):
267        """Advance to COMPLETED state, remove socket watcher, and invoke user's
268        completion callback.
269
270        :param BaseException | None result: value to pass in user's callback
271
272        """
273        _LOGGER.debug('_AsyncSocketConnector._report_completion(%r); %s',
274                      result, self._sock)
275
276        assert isinstance(result, (BaseException, type(None))), (
277            '_AsyncSocketConnector._report_completion() expected exception or '
278            'None as result.', result)
279        assert self._state == self._STATE_ACTIVE, (
280            '_AsyncSocketConnector._report_completion() expected '
281            '_STATE_NOT_STARTED', self._state)
282
283        self._state = self._STATE_COMPLETED
284        self._cleanup()
285
286        self._on_done(result)
287
288    @_log_exceptions
289    def _start_async(self):
290        """Called as callback from I/O loop to kick-start the workflow, so it's
291        safe to call user's completion callback from here, if needed
292
293        """
294        if self._state != self._STATE_ACTIVE:
295            # Must have been canceled by user before we were called
296            _LOGGER.debug(
297                'Abandoning sock=%s connection establishment to %s '
298                'due to inactive state=%s', self._sock, self._addr, self._state)
299            return
300
301        try:
302            self._sock.connect(self._addr)
303        except (Exception, pika.compat.SOCKET_ERROR) as error:  # pylint: disable=W0703
304            if (isinstance(error, pika.compat.SOCKET_ERROR) and
305                    error.errno in _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES):
306                # Connection establishment is pending
307                pass
308            else:
309                _LOGGER.error('%s.connect(%s) failed: %r', self._sock,
310                              self._addr, error)
311                self._report_completion(error)
312                return
313
314        # Get notified when the socket becomes writable
315        try:
316            self._nbio.set_writer(self._sock.fileno(), self._on_writable)
317        except Exception as error:  # pylint: disable=W0703
318            _LOGGER.exception('async.set_writer(%s) failed: %r', self._sock,
319                              error)
320            self._report_completion(error)
321            return
322        else:
323            self._watching_socket_events = True
324            _LOGGER.debug('Connection-establishment is in progress for %s.',
325                          self._sock)
326
327    @_log_exceptions
328    def _on_writable(self):
329        """Called when socket connects or fails to. Check for predicament and
330        invoke user's completion callback.
331
332        """
333        if self._state != self._STATE_ACTIVE:
334            # This should never happen since we remove the watcher upon
335            # `cancel()`
336            _LOGGER.error(
337                'Socket connection-establishment event watcher '
338                'called in inactive state (ignoring): %s; state=%s', self._sock,
339                self._state)
340            return
341
342        # The moment of truth...
343        error_code = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
344        if not error_code:
345            _LOGGER.info('Socket connected: %s', self._sock)
346            result = None
347        else:
348            error_msg = os.strerror(error_code)
349            _LOGGER.error('Socket failed to connect: %s; error=%s (%s)',
350                          self._sock, error_code, error_msg)
351            result = pika.compat.SOCKET_ERROR(error_code, error_msg)
352
353        self._report_completion(result)
354
355
356class _AsyncStreamConnector(object):
357    """Performs asynchronous SSL session establishment, if requested, on the
358    already-connected socket and links the streaming transport to protocol.
359    Used for implementing
360    `.nbio_interface.AbstractIOServices.create_streaming_connection()`.
361
362    """
363    _STATE_NOT_STARTED = 0  # start() not called yet
364    _STATE_ACTIVE = 1  # start() called and kicked off the workflow
365    _STATE_CANCELED = 2  # workflow terminated by cancel() request
366    _STATE_COMPLETED = 3  # workflow terminated by success or failure
367
368    def __init__(self, nbio, protocol_factory, sock, ssl_context,
369                 server_hostname, on_done):
370        """
371        NOTE: We take ownership of the given socket upon successful completion
372        of the constructor.
373
374        See `AbstractIOServices.create_streaming_connection()` for detailed
375        documentation of the corresponding args.
376
377        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
378        :param callable protocol_factory:
379        :param socket.socket sock:
380        :param ssl.SSLContext | None ssl_context:
381        :param str | None server_hostname:
382        :param callable on_done:
383
384        """
385        check_callback_arg(protocol_factory, 'protocol_factory')
386        check_callback_arg(on_done, 'on_done')
387
388        if not isinstance(ssl_context, (type(None), ssl.SSLContext)):
389            raise ValueError('Expected ssl_context=None | ssl.SSLContext, but '
390                             'got {!r}'.format(ssl_context))
391
392        if server_hostname is not None and ssl_context is None:
393            raise ValueError('Non-None server_hostname must not be passed '
394                             'without ssl context')
395
396        # Check that the socket connection establishment had completed in order
397        # to avoid stalling while waiting for the socket to become readable
398        # and/or writable.
399        try:
400            sock.getpeername()
401        except Exception as error:
402            raise ValueError(
403                'Expected connected socket, but getpeername() failed: '
404                'error={!r}; {}; '.format(error, sock))
405
406        self._nbio = nbio
407        self._protocol_factory = protocol_factory
408        self._sock = sock
409        self._ssl_context = ssl_context
410        self._server_hostname = server_hostname
411        self._on_done = on_done
412
413        self._state = self._STATE_NOT_STARTED
414        self._watching_socket = False
415
416    @_log_exceptions
417    def _cleanup(self, close):
418        """Cancel pending async operations, if any
419
420        :param bool close: close the socket if true
421        """
422        _LOGGER.debug('_AsyncStreamConnector._cleanup(%r)', close)
423
424        if self._watching_socket:
425            _LOGGER.debug(
426                '_AsyncStreamConnector._cleanup(%r): removing RdWr; %s', close,
427                self._sock)
428            self._watching_socket = False
429            self._nbio.remove_reader(self._sock.fileno())
430            self._nbio.remove_writer(self._sock.fileno())
431
432        try:
433            if close:
434                _LOGGER.debug(
435                    '_AsyncStreamConnector._cleanup(%r): closing socket; %s',
436                    close, self._sock)
437                try:
438                    self._sock.close()
439                except Exception as error:  # pylint: disable=W0703
440                    _LOGGER.exception('_sock.close() failed: error=%r; %s',
441                                      error, self._sock)
442                    raise
443        finally:
444            self._sock = None
445            self._nbio = None
446            self._protocol_factory = None
447            self._ssl_context = None
448            self._server_hostname = None
449            self._on_done = None
450
451    def start(self):
452        """Kick off the workflow
453
454        :rtype: AbstractIOReference
455        """
456        _LOGGER.debug('_AsyncStreamConnector.start(); %s', self._sock)
457
458        assert self._state == self._STATE_NOT_STARTED, (
459            '_AsyncStreamConnector.start() expected '
460            '_STATE_NOT_STARTED', self._state)
461
462        self._state = self._STATE_ACTIVE
463
464        # Request callback from I/O loop to start processing so that we don't
465        # end up making callbacks from the caller's scope
466        self._nbio.add_callback_threadsafe(self._start_async)
467
468        return _AsyncServiceAsyncHandle(self)
469
470    def cancel(self):
471        """Cancel pending connection request without calling user's completion
472        callback.
473
474        :returns: False if was already done or cancelled; True otherwise
475        :rtype: bool
476
477        """
478        if self._state == self._STATE_ACTIVE:
479            self._state = self._STATE_CANCELED
480            _LOGGER.debug('User canceled streaming linkup for %s', self._sock)
481            # Close the socket, since we took ownership
482            self._cleanup(close=True)
483            return True
484
485        _LOGGER.debug(
486            '_AsyncStreamConnector cancel requested when not ACTIVE: '
487            'state=%s; %s', self._state, self._sock)
488        return False
489
490    @_log_exceptions
491    def _report_completion(self, result):
492        """Advance to COMPLETED state, cancel async operation(s), and invoke
493        user's completion callback.
494
495        :param BaseException | tuple result: value to pass in user's callback.
496            `tuple(transport, protocol)` on success, exception on error
497
498        """
499        _LOGGER.debug('_AsyncStreamConnector._report_completion(%r); %s',
500                      result, self._sock)
501
502        assert isinstance(result, (BaseException, tuple)), (
503            '_AsyncStreamConnector._report_completion() expected exception or '
504            'tuple as result.', result, self._state)
505        assert self._state == self._STATE_ACTIVE, (
506            '_AsyncStreamConnector._report_completion() expected '
507            '_STATE_ACTIVE', self._state)
508
509        self._state = self._STATE_COMPLETED
510
511        # Notify user
512        try:
513            self._on_done(result)
514        except Exception:
515            _LOGGER.exception('%r: _on_done(%r) failed.',
516                              self._report_completion, result)
517            raise
518        finally:
519            # NOTE: Close the socket on error, since we took ownership of it
520            self._cleanup(close=isinstance(result, BaseException))
521
522    @_log_exceptions
523    def _start_async(self):
524        """Called as callback from I/O loop to kick-start the workflow, so it's
525        safe to call user's completion callback from here if needed
526
527        """
528        _LOGGER.debug('_AsyncStreamConnector._start_async(); %s', self._sock)
529
530        if self._state != self._STATE_ACTIVE:
531            # Must have been canceled by user before we were called
532            _LOGGER.debug(
533                'Abandoning streaming linkup due to inactive state '
534                'transition; state=%s; %s; .', self._state, self._sock)
535            return
536
537        # Link up protocol and transport if this is a plaintext linkup;
538        # otherwise kick-off SSL workflow first
539        if self._ssl_context is None:
540            self._linkup()
541        else:
542            _LOGGER.debug('Starting SSL handshake on %s', self._sock)
543
544            # Wrap our plain socket in ssl socket
545            try:
546                self._sock = self._ssl_context.wrap_socket(
547                    self._sock,
548                    server_side=False,
549                    do_handshake_on_connect=False,
550                    suppress_ragged_eofs=False,  # False = error on incoming EOF
551                    server_hostname=self._server_hostname)
552            except Exception as error:  # pylint: disable=W0703
553                _LOGGER.exception('SSL wrap_socket(%s) failed: %r', self._sock,
554                                  error)
555                self._report_completion(error)
556                return
557
558            self._do_ssl_handshake()
559
560    @_log_exceptions
561    def _linkup(self):
562        """Connection is ready: instantiate and link up transport and protocol,
563        and invoke user's completion callback.
564
565        """
566        _LOGGER.debug('_AsyncStreamConnector._linkup()')
567
568        transport = None
569
570        try:
571            # Create the protocol
572            try:
573                protocol = self._protocol_factory()
574            except Exception as error:
575                _LOGGER.exception('protocol_factory() failed: error=%r; %s',
576                                  error, self._sock)
577                raise
578
579            if self._ssl_context is None:
580                # Create plaintext streaming transport
581                try:
582                    transport = _AsyncPlaintextTransport(
583                        self._sock, protocol, self._nbio)
584                except Exception as error:
585                    _LOGGER.exception('PlainTransport() failed: error=%r; %s',
586                                      error, self._sock)
587                    raise
588            else:
589                # Create SSL streaming transport
590                try:
591                    transport = _AsyncSSLTransport(self._sock, protocol,
592                                                   self._nbio)
593                except Exception as error:
594                    _LOGGER.exception('SSLTransport() failed: error=%r; %s',
595                                      error, self._sock)
596                    raise
597
598            _LOGGER.debug('_linkup(): created transport %r', transport)
599
600            # Acquaint protocol with its transport
601            try:
602                protocol.connection_made(transport)
603            except Exception as error:
604                _LOGGER.exception(
605                    'protocol.connection_made(%r) failed: error=%r; %s',
606                    transport, error, self._sock)
607                raise
608
609            _LOGGER.debug('_linkup(): introduced transport to protocol %r; %r',
610                          transport, protocol)
611        except Exception as error:  # pylint: disable=W0703
612            result = error
613        else:
614            result = (transport, protocol)
615
616        self._report_completion(result)
617
618    @_log_exceptions
619    def _do_ssl_handshake(self):
620        """Perform asynchronous SSL handshake on the already wrapped socket
621
622        """
623        _LOGGER.debug('_AsyncStreamConnector._do_ssl_handshake()')
624
625        if self._state != self._STATE_ACTIVE:
626            _LOGGER.debug(
627                '_do_ssl_handshake: Abandoning streaming linkup due '
628                'to inactive state transition; state=%s; %s; .', self._state,
629                self._sock)
630            return
631
632        done = False
633
634        try:
635            try:
636                self._sock.do_handshake()
637            except ssl.SSLError as error:
638                if error.errno == ssl.SSL_ERROR_WANT_READ:
639                    _LOGGER.debug('SSL handshake wants read; %s.', self._sock)
640                    self._watching_socket = True
641                    self._nbio.set_reader(self._sock.fileno(),
642                                          self._do_ssl_handshake)
643                    self._nbio.remove_writer(self._sock.fileno())
644                elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
645                    _LOGGER.debug('SSL handshake wants write. %s', self._sock)
646                    self._watching_socket = True
647                    self._nbio.set_writer(self._sock.fileno(),
648                                          self._do_ssl_handshake)
649                    self._nbio.remove_reader(self._sock.fileno())
650                else:
651                    # Outer catch will report it
652                    raise
653            else:
654                done = True
655                _LOGGER.info('SSL handshake completed successfully: %s',
656                             self._sock)
657        except Exception as error:  # pylint: disable=W0703
658            _LOGGER.exception('SSL do_handshake failed: error=%r; %s', error,
659                              self._sock)
660            self._report_completion(error)
661            return
662
663        if done:
664            # Suspend I/O and link up transport with protocol
665            _LOGGER.debug(
666                '_do_ssl_handshake: removing watchers ahead of linkup: %s',
667                self._sock)
668            self._nbio.remove_reader(self._sock.fileno())
669            self._nbio.remove_writer(self._sock.fileno())
670            # So that our `_cleanup()` won't interfere with the transport's
671            # socket watcher configuration.
672            self._watching_socket = False
673            _LOGGER.debug(
674                '_do_ssl_handshake: pre-linkup removal of watchers is done; %s',
675                self._sock)
676
677            self._linkup()
678
679
680class _AsyncTransportBase(  # pylint: disable=W0223
681        AbstractStreamTransport):
682    """Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.
683
684    """
685
686    _STATE_ACTIVE = 1
687    _STATE_FAILED = 2  # connection failed
688    _STATE_ABORTED_BY_USER = 3  # cancel() called
689    _STATE_COMPLETED = 4  # done with connection
690
691    _MAX_RECV_BYTES = 4096  # per socket.recv() documentation recommendation
692
693    # Max per consume call to prevent event starvation
694    _MAX_CONSUME_BYTES = 1024 * 100
695
696    class RxEndOfFile(OSError):
697        """We raise this internally when EOF (empty read) is detected on input.
698
699        """
700
701        def __init__(self):
702            super(_AsyncTransportBase.RxEndOfFile, self).__init__(
703                -1, 'End of input stream (EOF)')
704
705    def __init__(self, sock, protocol, nbio):
706        """
707
708        :param socket.socket | ssl.SSLSocket sock: connected socket
709        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
710            corresponding protocol in this transport/protocol pairing; the
711            protocol already had its `connection_made()` method called.
712        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
713
714        """
715        _LOGGER.debug('_AsyncTransportBase.__init__: %s', sock)
716        self._sock = sock
717        self._protocol = protocol
718        self._nbio = nbio
719
720        self._state = self._STATE_ACTIVE
721        self._tx_buffers = collections.deque()
722        self._tx_buffered_byte_count = 0
723
724    def abort(self):
725        """Close connection abruptly without waiting for pending I/O to
726        complete. Will invoke the corresponding protocol's `connection_lost()`
727        method asynchronously (not in context of the abort() call).
728
729        :raises Exception: Exception-based exception on error
730        """
731        _LOGGER.info('Aborting transport connection: state=%s; %s', self._state,
732                     self._sock)
733
734        self._initiate_abort(None)
735
736    def get_protocol(self):
737        """Return the protocol linked to this transport.
738
739        :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
740        """
741        return self._protocol
742
743    def get_write_buffer_size(self):
744        """
745        :returns: Current size of output data buffered by the transport
746        :rtype: int
747        """
748        return self._tx_buffered_byte_count
749
750    def _buffer_tx_data(self, data):
751        """Buffer the given data until it can be sent asynchronously.
752
753        :param bytes data:
754        :raises ValueError: if called with empty data
755
756        """
757        if not data:
758            _LOGGER.error('write() called with empty data: state=%s; %s',
759                          self._state, self._sock)
760            raise ValueError('write() called with empty data {!r}'.format(data))
761
762        if self._state != self._STATE_ACTIVE:
763            _LOGGER.debug(
764                'Ignoring write() called during inactive state: '
765                'state=%s; %s', self._state, self._sock)
766            return
767
768        self._tx_buffers.append(data)
769        self._tx_buffered_byte_count += len(data)
770
771    def _consume(self):
772        """Utility method for use by subclasses to ingest data from socket and
773        dispatch it to protocol's `data_received()` method socket-specific
774        "try again" exception, per-event data consumption limit is reached,
775        transport becomes inactive, or a fatal failure.
776
777        Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
778        until state becomes inactive (e.g., `protocol.data_received()` callback
779        aborts the transport)
780
781        :raises: Whatever the corresponding `sock.recv()` raises except the
782                 socket error with errno.EINTR
783        :raises: Whatever the `protocol.data_received()` callback raises
784        :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream
785
786        """
787        bytes_consumed = 0
788
789        while (self._state == self._STATE_ACTIVE and
790               bytes_consumed < self._MAX_CONSUME_BYTES):
791            data = self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES)
792            bytes_consumed += len(data)
793
794            # Empty data, should disconnect
795            if not data:
796                _LOGGER.error('Socket EOF; %s', self._sock)
797                raise self.RxEndOfFile()
798
799            # Pass the data to the protocol
800            try:
801                self._protocol.data_received(data)
802            except Exception as error:
803                _LOGGER.exception(
804                    'protocol.data_received() failed: error=%r; %s', error,
805                    self._sock)
806                raise
807
808    def _produce(self):
809        """Utility method for use by subclasses to emit data from tx_buffers.
810        This method sends chunks from `tx_buffers` until all chunks are
811        exhausted or sending is interrupted by an exception. Maintains integrity
812        of `self.tx_buffers`.
813
814        :raises: whatever the corresponding `sock.send()` raises except the
815                 socket error with errno.EINTR
816
817        """
818        while self._tx_buffers:
819            num_bytes_sent = self._sigint_safe_send(self._sock,
820                                                    self._tx_buffers[0])
821
822            chunk = self._tx_buffers.popleft()
823            if num_bytes_sent < len(chunk):
824                _LOGGER.debug('Partial send, requeing remaining data; %s of %s',
825                              num_bytes_sent, len(chunk))
826                self._tx_buffers.appendleft(chunk[num_bytes_sent:])
827
828            self._tx_buffered_byte_count -= num_bytes_sent
829            assert self._tx_buffered_byte_count >= 0, (
830                '_AsyncTransportBase._produce() tx buffer size underflow',
831                self._tx_buffered_byte_count, self._state)
832
833    @staticmethod
834    @_retry_on_sigint
835    def _sigint_safe_recv(sock, max_bytes):
836        """Receive data from socket, retrying on SIGINT.
837
838        :param sock: stream or SSL socket
839        :param max_bytes: maximum number of bytes to receive
840        :returns: received data or empty bytes uppon end of file
841        :rtype: bytes
842        :raises: whatever the corresponding `sock.recv()` raises except socket
843                 error with errno.EINTR
844
845        """
846        return sock.recv(max_bytes)
847
848    @staticmethod
849    @_retry_on_sigint
850    def _sigint_safe_send(sock, data):
851        """Send data to socket, retrying on SIGINT.
852
853        :param sock: stream or SSL socket
854        :param data: data bytes to send
855        :returns: number of bytes actually sent
856        :rtype: int
857        :raises: whatever the corresponding `sock.send()` raises except socket
858                 error with errno.EINTR
859
860        """
861        return sock.send(data)
862
863    @_log_exceptions
864    def _deactivate(self):
865        """Unregister the transport from I/O events
866
867        """
868        if self._state == self._STATE_ACTIVE:
869            _LOGGER.info('Deactivating transport: state=%s; %s', self._state,
870                         self._sock)
871            self._nbio.remove_reader(self._sock.fileno())
872            self._nbio.remove_writer(self._sock.fileno())
873            self._tx_buffers.clear()
874
875    @_log_exceptions
876    def _close_and_finalize(self):
877        """Close the transport's socket and unlink the transport it from
878        references to other assets (protocol, etc.)
879
880        """
881        if self._state != self._STATE_COMPLETED:
882            _LOGGER.info('Closing transport socket and unlinking: state=%s; %s',
883                         self._state, self._sock)
884            try:
885                self._sock.shutdown(socket.SHUT_RDWR)
886            except pika.compat.SOCKET_ERROR:
887                pass
888            self._sock.close()
889            self._sock = None
890            self._protocol = None
891            self._nbio = None
892            self._state = self._STATE_COMPLETED
893
894    @_log_exceptions
895    def _initiate_abort(self, error):
896        """Initiate asynchronous abort of the transport that concludes with a
897        call to the protocol's `connection_lost()` method. No flushing of
898        output buffers will take place.
899
900        :param BaseException | None error: None if being canceled by user,
901            including via falsie return value from protocol.eof_received;
902            otherwise the exception corresponding to the the failed connection.
903        """
904        _LOGGER.info(
905            '_AsyncTransportBase._initate_abort(): Initiating abrupt '
906            'asynchronous transport shutdown: state=%s; error=%r; %s',
907            self._state, error, self._sock)
908
909        assert self._state != self._STATE_COMPLETED, (
910            '_AsyncTransportBase._initate_abort() expected '
911            'non-_STATE_COMPLETED', self._state)
912
913        if self._state == self._STATE_COMPLETED:
914            return
915
916        self._deactivate()
917
918        # Update state
919        if error is None:
920            # Being aborted by user
921
922            if self._state == self._STATE_ABORTED_BY_USER:
923                # Abort by user already pending
924                _LOGGER.debug('_AsyncTransportBase._initiate_abort(): '
925                              'ignoring - user-abort already pending.')
926                return
927
928            # Notification priority is given to user-initiated abort over
929            # failed connection
930            self._state = self._STATE_ABORTED_BY_USER
931        else:
932            # Connection failed
933
934            if self._state != self._STATE_ACTIVE:
935                assert self._state == self._STATE_ABORTED_BY_USER, (
936                    '_AsyncTransportBase._initate_abort() expected '
937                    '_STATE_ABORTED_BY_USER', self._state)
938                return
939
940            self._state = self._STATE_FAILED
941
942        # Schedule callback from I/O loop to avoid potential reentry into user
943        # code
944        self._nbio.add_callback_threadsafe(
945            functools.partial(self._connection_lost_notify_async, error))
946
947    @_log_exceptions
948    def _connection_lost_notify_async(self, error):
949        """Handle aborting of transport either due to socket error or user-
950        initiated `abort()` call. Must be called from an I/O loop callback owned
951        by us in order to avoid reentry into user code from user's API call into
952        the transport.
953
954        :param BaseException | None error: None if being canceled by user;
955            otherwise the exception corresponding to the the failed connection.
956        """
957        _LOGGER.debug('Concluding transport shutdown: state=%s; error=%r',
958                      self._state, error)
959
960        if self._state == self._STATE_COMPLETED:
961            return
962
963        if error is not None and self._state != self._STATE_FAILED:
964            # Priority is given to user-initiated abort notification
965            assert self._state == self._STATE_ABORTED_BY_USER, (
966                '_AsyncTransportBase._connection_lost_notify_async() '
967                'expected _STATE_ABORTED_BY_USER', self._state)
968            return
969
970        # Inform protocol
971        try:
972            self._protocol.connection_lost(error)
973        except Exception as exc:  # pylint: disable=W0703
974            _LOGGER.exception('protocol.connection_lost(%r) failed: exc=%r; %s',
975                              error, exc, self._sock)
976            # Re-raise, since we've exhausted our normal failure notification
977            # mechanism (i.e., connection_lost())
978            raise
979        finally:
980            self._close_and_finalize()
981
982
983class _AsyncPlaintextTransport(_AsyncTransportBase):
984    """Implementation of `nbio_interface.AbstractStreamTransport` for a
985    plaintext connection.
986
987    """
988
989    def __init__(self, sock, protocol, nbio):
990        """
991
992        :param socket.socket sock: non-blocking connected socket
993        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
994            corresponding protocol in this transport/protocol pairing; the
995            protocol already had its `connection_made()` method called.
996        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
997
998        """
999        super(_AsyncPlaintextTransport, self).__init__(sock, protocol, nbio)
1000
1001        # Request to be notified of incoming data; we'll watch for writability
1002        # only when our write buffer is non-empty
1003        self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
1004
1005    def write(self, data):
1006        """Buffer the given data until it can be sent asynchronously.
1007
1008        :param bytes data:
1009        :raises ValueError: if called with empty data
1010
1011        """
1012        if self._state != self._STATE_ACTIVE:
1013            _LOGGER.debug(
1014                'Ignoring write() called during inactive state: '
1015                'state=%s; %s', self._state, self._sock)
1016            return
1017
1018        assert data, ('_AsyncPlaintextTransport.write(): empty data from user.',
1019                      data, self._state)
1020
1021        if not self.get_write_buffer_size():
1022            self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
1023            _LOGGER.debug('Turned on writability watcher: %s', self._sock)
1024
1025        self._buffer_tx_data(data)
1026
1027    @_log_exceptions
1028    def _on_socket_readable(self):
1029        """Ingest data from socket and dispatch it to protocol until exception
1030        occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
1031        limit is reached, transport becomes inactive, or failure.
1032
1033        """
1034        if self._state != self._STATE_ACTIVE:
1035            _LOGGER.debug(
1036                'Ignoring readability notification due to inactive '
1037                'state: state=%s; %s', self._state, self._sock)
1038            return
1039
1040        try:
1041            self._consume()
1042        except self.RxEndOfFile:
1043            try:
1044                keep_open = self._protocol.eof_received()
1045            except Exception as error:  # pylint: disable=W0703
1046                _LOGGER.exception(
1047                    'protocol.eof_received() failed: error=%r; %s', error,
1048                    self._sock)
1049                self._initiate_abort(error)
1050            else:
1051                if keep_open:
1052                    _LOGGER.info(
1053                        'protocol.eof_received() elected to keep open: %s',
1054                        self._sock)
1055                    self._nbio.remove_reader(self._sock.fileno())
1056                else:
1057                    _LOGGER.info('protocol.eof_received() elected to close: %s',
1058                                 self._sock)
1059                    self._initiate_abort(None)
1060        except (Exception, pika.compat.SOCKET_ERROR) as error:  # pylint: disable=W0703
1061            if (isinstance(error, pika.compat.SOCKET_ERROR) and
1062                    error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
1063                _LOGGER.debug('Recv would block on %s', self._sock)
1064            else:
1065                _LOGGER.exception(
1066                    '_AsyncBaseTransport._consume() failed, aborting '
1067                    'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
1068                    error, self._sock, ''.join(
1069                        traceback.format_exception(*sys.exc_info())))
1070                self._initiate_abort(error)
1071        else:
1072            if self._state != self._STATE_ACTIVE:
1073                # Most likely our protocol's `data_received()` aborted the
1074                # transport
1075                _LOGGER.debug(
1076                    'Leaving Plaintext consumer due to inactive '
1077                    'state: state=%s; %s', self._state, self._sock)
1078
1079    @_log_exceptions
1080    def _on_socket_writable(self):
1081        """Handle writable socket notification
1082
1083        """
1084        if self._state != self._STATE_ACTIVE:
1085            _LOGGER.debug(
1086                'Ignoring writability notification due to inactive '
1087                'state: state=%s; %s', self._state, self._sock)
1088            return
1089
1090        # We shouldn't be getting called with empty tx buffers
1091        assert self._tx_buffers, (
1092            '_AsyncPlaintextTransport._on_socket_writable() called, '
1093            'but _tx_buffers is empty.', self._state)
1094
1095        try:
1096            # Transmit buffered data to remote socket
1097            self._produce()
1098        except (Exception, pika.compat.SOCKET_ERROR) as error:  # pylint: disable=W0703
1099            if (isinstance(error, pika.compat.SOCKET_ERROR) and
1100                    error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
1101                _LOGGER.debug('Send would block on %s', self._sock)
1102            else:
1103                _LOGGER.exception(
1104                    '_AsyncBaseTransport._produce() failed, aborting '
1105                    'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
1106                    error, self._sock, ''.join(
1107                        traceback.format_exception(*sys.exc_info())))
1108                self._initiate_abort(error)
1109        else:
1110            if not self._tx_buffers:
1111                self._nbio.remove_writer(self._sock.fileno())
1112                _LOGGER.debug('Turned off writability watcher: %s', self._sock)
1113
1114
1115class _AsyncSSLTransport(_AsyncTransportBase):
1116    """Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
1117    connection.
1118
1119    """
1120
1121    def __init__(self, sock, protocol, nbio):
1122        """
1123
1124        :param ssl.SSLSocket sock: non-blocking connected socket
1125        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
1126            corresponding protocol in this transport/protocol pairing; the
1127            protocol already had its `connection_made()` method called.
1128        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
1129
1130        """
1131        super(_AsyncSSLTransport, self).__init__(sock, protocol, nbio)
1132
1133        self._ssl_readable_action = self._consume
1134        self._ssl_writable_action = None
1135
1136        # Bootstrap consumer; we'll take care of producer once data is buffered
1137        self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
1138        # Try reading asap just in case read-ahead caused some
1139        self._nbio.add_callback_threadsafe(self._on_socket_readable)
1140
1141    def write(self, data):
1142        """Buffer the given data until it can be sent asynchronously.
1143
1144        :param bytes data:
1145        :raises ValueError: if called with empty data
1146
1147        """
1148        if self._state != self._STATE_ACTIVE:
1149            _LOGGER.debug(
1150                'Ignoring write() called during inactive state: '
1151                'state=%s; %s', self._state, self._sock)
1152            return
1153
1154        tx_buffer_was_empty = self.get_write_buffer_size() == 0
1155
1156        self._buffer_tx_data(data)
1157
1158        if tx_buffer_was_empty and self._ssl_writable_action is None:
1159            self._ssl_writable_action = self._produce
1160            self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
1161            _LOGGER.debug('Turned on writability watcher: %s', self._sock)
1162
1163    @_log_exceptions
1164    def _on_socket_readable(self):
1165        """Handle readable socket indication
1166
1167        """
1168        if self._state != self._STATE_ACTIVE:
1169            _LOGGER.debug(
1170                'Ignoring readability notification due to inactive '
1171                'state: state=%s; %s', self._state, self._sock)
1172            return
1173
1174        if self._ssl_readable_action:
1175            try:
1176                self._ssl_readable_action()
1177            except Exception as error:  # pylint: disable=W0703
1178                self._initiate_abort(error)
1179        else:
1180            _LOGGER.debug(
1181                'SSL readable action was suppressed: '
1182                'ssl_writable_action=%r; %s', self._ssl_writable_action,
1183                self._sock)
1184
1185    @_log_exceptions
1186    def _on_socket_writable(self):
1187        """Handle writable socket notification
1188
1189        """
1190        if self._state != self._STATE_ACTIVE:
1191            _LOGGER.debug(
1192                'Ignoring writability notification due to inactive '
1193                'state: state=%s; %s', self._state, self._sock)
1194            return
1195
1196        if self._ssl_writable_action:
1197            try:
1198                self._ssl_writable_action()
1199            except Exception as error:  # pylint: disable=W0703
1200                self._initiate_abort(error)
1201        else:
1202            _LOGGER.debug(
1203                'SSL writable action was suppressed: '
1204                'ssl_readable_action=%r; %s', self._ssl_readable_action,
1205                self._sock)
1206
1207    @_log_exceptions
1208    def _consume(self):
1209        """[override] Ingest data from socket and dispatch it to protocol until
1210        exception occurs (typically ssl.SSLError with
1211        SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
1212        transport becomes inactive, or failure.
1213
1214        Update consumer/producer registration.
1215
1216        :raises Exception: error that signals that connection needs to be
1217            aborted
1218        """
1219        next_consume_on_readable = True
1220
1221        try:
1222            super(_AsyncSSLTransport, self)._consume()
1223        except ssl.SSLError as error:
1224            if error.errno == ssl.SSL_ERROR_WANT_READ:
1225                _LOGGER.debug('SSL ingester wants read: %s', self._sock)
1226            elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
1227                # Looks like SSL re-negotiation
1228                _LOGGER.debug('SSL ingester wants write: %s', self._sock)
1229                next_consume_on_readable = False
1230            else:
1231                _LOGGER.exception(
1232                    '_AsyncBaseTransport._consume() failed, aborting '
1233                    'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
1234                    error, self._sock, ''.join(
1235                        traceback.format_exception(*sys.exc_info())))
1236                raise  # let outer catch block abort the transport
1237        else:
1238            if self._state != self._STATE_ACTIVE:
1239                # Most likely our protocol's `data_received()` aborted the
1240                # transport
1241                _LOGGER.debug(
1242                    'Leaving SSL consumer due to inactive '
1243                    'state: state=%s; %s', self._state, self._sock)
1244                return
1245
1246            # Consumer exited without exception; there may still be more,
1247            # possibly unprocessed, data records in SSL input buffers that
1248            # can be read without waiting for socket to become readable.
1249
1250            # In case buffered input SSL data records still remain
1251            self._nbio.add_callback_threadsafe(self._on_socket_readable)
1252
1253        # Update consumer registration
1254        if next_consume_on_readable:
1255            if not self._ssl_readable_action:
1256                self._nbio.set_reader(self._sock.fileno(),
1257                                      self._on_socket_readable)
1258            self._ssl_readable_action = self._consume
1259
1260            # NOTE: can't use identity check, it fails for instance methods
1261            if self._ssl_writable_action == self._consume: # pylint: disable=W0143
1262                self._nbio.remove_writer(self._sock.fileno())
1263                self._ssl_writable_action = None
1264        else:
1265            # WANT_WRITE
1266            if not self._ssl_writable_action:
1267                self._nbio.set_writer(self._sock.fileno(),
1268                                      self._on_socket_writable)
1269            self._ssl_writable_action = self._consume
1270
1271            if self._ssl_readable_action:
1272                self._nbio.remove_reader(self._sock.fileno())
1273                self._ssl_readable_action = None
1274
1275        # Update producer registration
1276        if self._tx_buffers and not self._ssl_writable_action:
1277            self._ssl_writable_action = self._produce
1278            self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
1279
1280    @_log_exceptions
1281    def _produce(self):
1282        """[override] Emit data from tx_buffers all chunks are exhausted or
1283        sending is interrupted by an exception (typically ssl.SSLError with
1284        SSL_ERROR_WANT_READ/WRITE).
1285
1286        Update consumer/producer registration.
1287
1288        :raises Exception: error that signals that connection needs to be
1289            aborted
1290
1291        """
1292        next_produce_on_writable = None  # None means no need to produce
1293
1294        try:
1295            super(_AsyncSSLTransport, self)._produce()
1296        except ssl.SSLError as error:
1297            if error.errno == ssl.SSL_ERROR_WANT_READ:
1298                # Looks like SSL re-negotiation
1299                _LOGGER.debug('SSL emitter wants read: %s', self._sock)
1300                next_produce_on_writable = False
1301            elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
1302                _LOGGER.debug('SSL emitter wants write: %s', self._sock)
1303                next_produce_on_writable = True
1304            else:
1305                _LOGGER.exception(
1306                    '_AsyncBaseTransport._produce() failed, aborting '
1307                    'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
1308                    error, self._sock, ''.join(
1309                        traceback.format_exception(*sys.exc_info())))
1310                raise  # let outer catch block abort the transport
1311        else:
1312            # No exception, so everything must have been written to the socket
1313            assert not self._tx_buffers, (
1314                '_AsyncSSLTransport._produce(): no exception from parent '
1315                'class, but data remains in _tx_buffers.', len(
1316                    self._tx_buffers))
1317
1318        # Update producer registration
1319        if self._tx_buffers:
1320            assert next_produce_on_writable is not None, (
1321                '_AsyncSSLTransport._produce(): next_produce_on_writable is '
1322                'still None', self._state)
1323
1324            if next_produce_on_writable:
1325                if not self._ssl_writable_action:
1326                    self._nbio.set_writer(self._sock.fileno(),
1327                                          self._on_socket_writable)
1328                self._ssl_writable_action = self._produce
1329
1330                # NOTE: can't use identity check, it fails for instance methods
1331                if self._ssl_readable_action == self._produce: # pylint: disable=W0143
1332                    self._nbio.remove_reader(self._sock.fileno())
1333                    self._ssl_readable_action = None
1334            else:
1335                # WANT_READ
1336                if not self._ssl_readable_action:
1337                    self._nbio.set_reader(self._sock.fileno(),
1338                                          self._on_socket_readable)
1339                self._ssl_readable_action = self._produce
1340
1341                if self._ssl_writable_action:
1342                    self._nbio.remove_writer(self._sock.fileno())
1343                    self._ssl_writable_action = None
1344        else:
1345            # NOTE: can't use identity check, it fails for instance methods
1346            if self._ssl_readable_action == self._produce: # pylint: disable=W0143
1347                self._nbio.remove_reader(self._sock.fileno())
1348                self._ssl_readable_action = None
1349                assert self._ssl_writable_action != self._produce, ( # pylint: disable=W0143
1350                    '_AsyncSSLTransport._produce(): with empty tx_buffers, '
1351                    'writable_action cannot be _produce when readable is '
1352                    '_produce', self._state)
1353            else:
1354                # NOTE: can't use identity check, it fails for instance methods
1355                assert self._ssl_writable_action == self._produce, ( # pylint: disable=W0143
1356                    '_AsyncSSLTransport._produce(): with empty tx_buffers, '
1357                    'expected writable_action as _produce when readable_action '
1358                    'is not _produce', 'writable_action:',
1359                    self._ssl_writable_action, 'readable_action:',
1360                    self._ssl_readable_action, 'state:', self._state)
1361                self._ssl_writable_action = None
1362                self._nbio.remove_writer(self._sock.fileno())
1363
1364        # Update consumer registration
1365        if not self._ssl_readable_action:
1366            self._ssl_readable_action = self._consume
1367            self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
1368            # In case input SSL data records have been buffered
1369            self._nbio.add_callback_threadsafe(self._on_socket_readable)
1370        elif self._sock.pending():
1371            self._nbio.add_callback_threadsafe(self._on_socket_readable)
1372