1"""Utilities for implementing `nbio_interface.AbstractIOServices` for 2pika connection adapters. 3 4""" 5 6import collections 7import errno 8import functools 9import logging 10import numbers 11import os 12import socket 13import ssl 14import sys 15import traceback 16 17from pika.adapters.utils.nbio_interface import (AbstractIOReference, 18 AbstractStreamTransport) 19import pika.compat 20import pika.diagnostic_utils 21 22# "Try again" error codes for non-blocking socket I/O - send()/recv(). 23# NOTE: POSIX.1 allows either error to be returned for this case and doesn't require 24# them to have the same value. 25_TRY_IO_AGAIN_SOCK_ERROR_CODES = ( 26 errno.EAGAIN, 27 errno.EWOULDBLOCK, 28) 29 30# "Connection establishment pending" error codes for non-blocking socket 31# connect() call. 32# NOTE: EINPROGRESS for Posix and EWOULDBLOCK for Windows 33_CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES = ( 34 errno.EINPROGRESS, 35 errno.EWOULDBLOCK, 36) 37 38_LOGGER = logging.getLogger(__name__) 39 40# Decorator that logs exceptions escaping from the decorated function 41_log_exceptions = pika.diagnostic_utils.create_log_exception_decorator(_LOGGER) # pylint: disable=C0103 42 43 44def check_callback_arg(callback, name): 45 """Raise TypeError if callback is not callable 46 47 :param callback: callback to check 48 :param name: Name to include in exception text 49 :raises TypeError: 50 51 """ 52 if not callable(callback): 53 raise TypeError('{} must be callable, but got {!r}'.format( 54 name, callback)) 55 56 57def check_fd_arg(fd): 58 """Raise TypeError if file descriptor is not an integer 59 60 :param fd: file descriptor 61 :raises TypeError: 62 63 """ 64 if not isinstance(fd, numbers.Integral): 65 raise TypeError( 66 'Paramter must be a file descriptor, but got {!r}'.format(fd)) 67 68 69def _retry_on_sigint(func): 70 """Function decorator for retrying on SIGINT. 71 72 """ 73 74 @functools.wraps(func) 75 def retry_sigint_wrap(*args, **kwargs): 76 """Wrapper for decorated function""" 77 while True: 78 try: 79 return func(*args, **kwargs) 80 except pika.compat.SOCKET_ERROR as error: 81 if error.errno == errno.EINTR: 82 continue 83 else: 84 raise 85 86 return retry_sigint_wrap 87 88 89class SocketConnectionMixin(object): 90 """Implements 91 `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()` 92 on top of 93 `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and 94 basic `pika.adapters.utils.nbio_interface.AbstractIOServices`. 95 96 """ 97 98 def connect_socket(self, sock, resolved_addr, on_done): 99 """Implement 100 :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`. 101 102 """ 103 return _AsyncSocketConnector( 104 nbio=self, sock=sock, resolved_addr=resolved_addr, 105 on_done=on_done).start() 106 107 108class StreamingConnectionMixin(object): 109 """Implements 110 `.nbio_interface.AbstractIOServices.create_streaming_connection()` on 111 top of `.nbio_interface.AbstractFileDescriptorServices` and basic 112 `nbio_interface.AbstractIOServices` services. 113 114 """ 115 116 def create_streaming_connection(self, 117 protocol_factory, 118 sock, 119 on_done, 120 ssl_context=None, 121 server_hostname=None): 122 """Implement 123 :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`. 124 125 """ 126 try: 127 return _AsyncStreamConnector( 128 nbio=self, 129 protocol_factory=protocol_factory, 130 sock=sock, 131 ssl_context=ssl_context, 132 server_hostname=server_hostname, 133 on_done=on_done).start() 134 except Exception as error: 135 _LOGGER.error('create_streaming_connection(%s) failed: %r', sock, 136 error) 137 # Close the socket since this function takes ownership 138 try: 139 sock.close() 140 except Exception as error: # pylint: disable=W0703 141 # We log and suppress the exception from sock.close() so that 142 # the original error from _AsyncStreamConnector constructor will 143 # percolate 144 _LOGGER.error('%s.close() failed: %r', sock, error) 145 146 raise 147 148 149class _AsyncServiceAsyncHandle(AbstractIOReference): 150 """This module's adaptation of `.nbio_interface.AbstractIOReference` 151 152 """ 153 154 def __init__(self, subject): 155 """ 156 :param subject: subject of the reference containing a `cancel()` method 157 158 """ 159 self._cancel = subject.cancel 160 161 def cancel(self): 162 """Cancel pending operation 163 164 :returns: False if was already done or cancelled; True otherwise 165 :rtype: bool 166 167 """ 168 return self._cancel() 169 170 171class _AsyncSocketConnector(object): 172 """Connects the given non-blocking socket asynchronously using 173 `.nbio_interface.AbstractFileDescriptorServices` and basic 174 `.nbio_interface.AbstractIOServices`. Used for implementing 175 `.nbio_interface.AbstractIOServices.connect_socket()`. 176 """ 177 178 _STATE_NOT_STARTED = 0 # start() not called yet 179 _STATE_ACTIVE = 1 # workflow started 180 _STATE_CANCELED = 2 # workflow aborted by user's cancel() call 181 _STATE_COMPLETED = 3 # workflow completed: succeeded or failed 182 183 def __init__(self, nbio, sock, resolved_addr, on_done): 184 """ 185 :param AbstractIOServices | AbstractFileDescriptorServices nbio: 186 :param socket.socket sock: non-blocking socket that needs to be 187 connected via `socket.socket.connect()` 188 :param tuple resolved_addr: resolved destination address/port two-tuple 189 which is compatible with the given's socket's address family 190 :param callable on_done: user callback that takes None upon successful 191 completion or exception upon error (check for `BaseException`) as 192 its only arg. It will not be called if the operation was cancelled. 193 :raises ValueError: if host portion of `resolved_addr` is not an IP 194 address or is inconsistent with the socket's address family as 195 validated via `socket.inet_pton()` 196 """ 197 check_callback_arg(on_done, 'on_done') 198 199 try: 200 socket.inet_pton(sock.family, resolved_addr[0]) 201 except Exception as error: # pylint: disable=W0703 202 if not hasattr(socket, 'inet_pton'): 203 _LOGGER.debug( 204 'Unable to check resolved address: no socket.inet_pton().') 205 else: 206 msg = ('Invalid or unresolved IP address ' 207 '{!r} for socket {}: {!r}').format( 208 resolved_addr, sock, error) 209 _LOGGER.error(msg) 210 raise ValueError(msg) 211 212 self._nbio = nbio 213 self._sock = sock 214 self._addr = resolved_addr 215 self._on_done = on_done 216 self._state = self._STATE_NOT_STARTED 217 self._watching_socket_events = False 218 219 @_log_exceptions 220 def _cleanup(self): 221 """Remove socket watcher, if any 222 223 """ 224 if self._watching_socket_events: 225 self._watching_socket_events = False 226 self._nbio.remove_writer(self._sock.fileno()) 227 228 def start(self): 229 """Start asynchronous connection establishment. 230 231 :rtype: AbstractIOReference 232 """ 233 assert self._state == self._STATE_NOT_STARTED, ( 234 '_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED', 235 self._state) 236 237 self._state = self._STATE_ACTIVE 238 239 # Continue the rest of the operation on the I/O loop to avoid calling 240 # user's completion callback from the scope of user's call 241 self._nbio.add_callback_threadsafe(self._start_async) 242 243 return _AsyncServiceAsyncHandle(self) 244 245 def cancel(self): 246 """Cancel pending connection request without calling user's completion 247 callback. 248 249 :returns: False if was already done or cancelled; True otherwise 250 :rtype: bool 251 252 """ 253 if self._state == self._STATE_ACTIVE: 254 self._state = self._STATE_CANCELED 255 _LOGGER.debug('User canceled connection request for %s to %s', 256 self._sock, self._addr) 257 self._cleanup() 258 return True 259 260 _LOGGER.debug( 261 '_AsyncSocketConnector cancel requested when not ACTIVE: ' 262 'state=%s; %s', self._state, self._sock) 263 return False 264 265 @_log_exceptions 266 def _report_completion(self, result): 267 """Advance to COMPLETED state, remove socket watcher, and invoke user's 268 completion callback. 269 270 :param BaseException | None result: value to pass in user's callback 271 272 """ 273 _LOGGER.debug('_AsyncSocketConnector._report_completion(%r); %s', 274 result, self._sock) 275 276 assert isinstance(result, (BaseException, type(None))), ( 277 '_AsyncSocketConnector._report_completion() expected exception or ' 278 'None as result.', result) 279 assert self._state == self._STATE_ACTIVE, ( 280 '_AsyncSocketConnector._report_completion() expected ' 281 '_STATE_NOT_STARTED', self._state) 282 283 self._state = self._STATE_COMPLETED 284 self._cleanup() 285 286 self._on_done(result) 287 288 @_log_exceptions 289 def _start_async(self): 290 """Called as callback from I/O loop to kick-start the workflow, so it's 291 safe to call user's completion callback from here, if needed 292 293 """ 294 if self._state != self._STATE_ACTIVE: 295 # Must have been canceled by user before we were called 296 _LOGGER.debug( 297 'Abandoning sock=%s connection establishment to %s ' 298 'due to inactive state=%s', self._sock, self._addr, self._state) 299 return 300 301 try: 302 self._sock.connect(self._addr) 303 except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703 304 if (isinstance(error, pika.compat.SOCKET_ERROR) and 305 error.errno in _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES): 306 # Connection establishment is pending 307 pass 308 else: 309 _LOGGER.error('%s.connect(%s) failed: %r', self._sock, 310 self._addr, error) 311 self._report_completion(error) 312 return 313 314 # Get notified when the socket becomes writable 315 try: 316 self._nbio.set_writer(self._sock.fileno(), self._on_writable) 317 except Exception as error: # pylint: disable=W0703 318 _LOGGER.exception('async.set_writer(%s) failed: %r', self._sock, 319 error) 320 self._report_completion(error) 321 return 322 else: 323 self._watching_socket_events = True 324 _LOGGER.debug('Connection-establishment is in progress for %s.', 325 self._sock) 326 327 @_log_exceptions 328 def _on_writable(self): 329 """Called when socket connects or fails to. Check for predicament and 330 invoke user's completion callback. 331 332 """ 333 if self._state != self._STATE_ACTIVE: 334 # This should never happen since we remove the watcher upon 335 # `cancel()` 336 _LOGGER.error( 337 'Socket connection-establishment event watcher ' 338 'called in inactive state (ignoring): %s; state=%s', self._sock, 339 self._state) 340 return 341 342 # The moment of truth... 343 error_code = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 344 if not error_code: 345 _LOGGER.info('Socket connected: %s', self._sock) 346 result = None 347 else: 348 error_msg = os.strerror(error_code) 349 _LOGGER.error('Socket failed to connect: %s; error=%s (%s)', 350 self._sock, error_code, error_msg) 351 result = pika.compat.SOCKET_ERROR(error_code, error_msg) 352 353 self._report_completion(result) 354 355 356class _AsyncStreamConnector(object): 357 """Performs asynchronous SSL session establishment, if requested, on the 358 already-connected socket and links the streaming transport to protocol. 359 Used for implementing 360 `.nbio_interface.AbstractIOServices.create_streaming_connection()`. 361 362 """ 363 _STATE_NOT_STARTED = 0 # start() not called yet 364 _STATE_ACTIVE = 1 # start() called and kicked off the workflow 365 _STATE_CANCELED = 2 # workflow terminated by cancel() request 366 _STATE_COMPLETED = 3 # workflow terminated by success or failure 367 368 def __init__(self, nbio, protocol_factory, sock, ssl_context, 369 server_hostname, on_done): 370 """ 371 NOTE: We take ownership of the given socket upon successful completion 372 of the constructor. 373 374 See `AbstractIOServices.create_streaming_connection()` for detailed 375 documentation of the corresponding args. 376 377 :param AbstractIOServices | AbstractFileDescriptorServices nbio: 378 :param callable protocol_factory: 379 :param socket.socket sock: 380 :param ssl.SSLContext | None ssl_context: 381 :param str | None server_hostname: 382 :param callable on_done: 383 384 """ 385 check_callback_arg(protocol_factory, 'protocol_factory') 386 check_callback_arg(on_done, 'on_done') 387 388 if not isinstance(ssl_context, (type(None), ssl.SSLContext)): 389 raise ValueError('Expected ssl_context=None | ssl.SSLContext, but ' 390 'got {!r}'.format(ssl_context)) 391 392 if server_hostname is not None and ssl_context is None: 393 raise ValueError('Non-None server_hostname must not be passed ' 394 'without ssl context') 395 396 # Check that the socket connection establishment had completed in order 397 # to avoid stalling while waiting for the socket to become readable 398 # and/or writable. 399 try: 400 sock.getpeername() 401 except Exception as error: 402 raise ValueError( 403 'Expected connected socket, but getpeername() failed: ' 404 'error={!r}; {}; '.format(error, sock)) 405 406 self._nbio = nbio 407 self._protocol_factory = protocol_factory 408 self._sock = sock 409 self._ssl_context = ssl_context 410 self._server_hostname = server_hostname 411 self._on_done = on_done 412 413 self._state = self._STATE_NOT_STARTED 414 self._watching_socket = False 415 416 @_log_exceptions 417 def _cleanup(self, close): 418 """Cancel pending async operations, if any 419 420 :param bool close: close the socket if true 421 """ 422 _LOGGER.debug('_AsyncStreamConnector._cleanup(%r)', close) 423 424 if self._watching_socket: 425 _LOGGER.debug( 426 '_AsyncStreamConnector._cleanup(%r): removing RdWr; %s', close, 427 self._sock) 428 self._watching_socket = False 429 self._nbio.remove_reader(self._sock.fileno()) 430 self._nbio.remove_writer(self._sock.fileno()) 431 432 try: 433 if close: 434 _LOGGER.debug( 435 '_AsyncStreamConnector._cleanup(%r): closing socket; %s', 436 close, self._sock) 437 try: 438 self._sock.close() 439 except Exception as error: # pylint: disable=W0703 440 _LOGGER.exception('_sock.close() failed: error=%r; %s', 441 error, self._sock) 442 raise 443 finally: 444 self._sock = None 445 self._nbio = None 446 self._protocol_factory = None 447 self._ssl_context = None 448 self._server_hostname = None 449 self._on_done = None 450 451 def start(self): 452 """Kick off the workflow 453 454 :rtype: AbstractIOReference 455 """ 456 _LOGGER.debug('_AsyncStreamConnector.start(); %s', self._sock) 457 458 assert self._state == self._STATE_NOT_STARTED, ( 459 '_AsyncStreamConnector.start() expected ' 460 '_STATE_NOT_STARTED', self._state) 461 462 self._state = self._STATE_ACTIVE 463 464 # Request callback from I/O loop to start processing so that we don't 465 # end up making callbacks from the caller's scope 466 self._nbio.add_callback_threadsafe(self._start_async) 467 468 return _AsyncServiceAsyncHandle(self) 469 470 def cancel(self): 471 """Cancel pending connection request without calling user's completion 472 callback. 473 474 :returns: False if was already done or cancelled; True otherwise 475 :rtype: bool 476 477 """ 478 if self._state == self._STATE_ACTIVE: 479 self._state = self._STATE_CANCELED 480 _LOGGER.debug('User canceled streaming linkup for %s', self._sock) 481 # Close the socket, since we took ownership 482 self._cleanup(close=True) 483 return True 484 485 _LOGGER.debug( 486 '_AsyncStreamConnector cancel requested when not ACTIVE: ' 487 'state=%s; %s', self._state, self._sock) 488 return False 489 490 @_log_exceptions 491 def _report_completion(self, result): 492 """Advance to COMPLETED state, cancel async operation(s), and invoke 493 user's completion callback. 494 495 :param BaseException | tuple result: value to pass in user's callback. 496 `tuple(transport, protocol)` on success, exception on error 497 498 """ 499 _LOGGER.debug('_AsyncStreamConnector._report_completion(%r); %s', 500 result, self._sock) 501 502 assert isinstance(result, (BaseException, tuple)), ( 503 '_AsyncStreamConnector._report_completion() expected exception or ' 504 'tuple as result.', result, self._state) 505 assert self._state == self._STATE_ACTIVE, ( 506 '_AsyncStreamConnector._report_completion() expected ' 507 '_STATE_ACTIVE', self._state) 508 509 self._state = self._STATE_COMPLETED 510 511 # Notify user 512 try: 513 self._on_done(result) 514 except Exception: 515 _LOGGER.exception('%r: _on_done(%r) failed.', 516 self._report_completion, result) 517 raise 518 finally: 519 # NOTE: Close the socket on error, since we took ownership of it 520 self._cleanup(close=isinstance(result, BaseException)) 521 522 @_log_exceptions 523 def _start_async(self): 524 """Called as callback from I/O loop to kick-start the workflow, so it's 525 safe to call user's completion callback from here if needed 526 527 """ 528 _LOGGER.debug('_AsyncStreamConnector._start_async(); %s', self._sock) 529 530 if self._state != self._STATE_ACTIVE: 531 # Must have been canceled by user before we were called 532 _LOGGER.debug( 533 'Abandoning streaming linkup due to inactive state ' 534 'transition; state=%s; %s; .', self._state, self._sock) 535 return 536 537 # Link up protocol and transport if this is a plaintext linkup; 538 # otherwise kick-off SSL workflow first 539 if self._ssl_context is None: 540 self._linkup() 541 else: 542 _LOGGER.debug('Starting SSL handshake on %s', self._sock) 543 544 # Wrap our plain socket in ssl socket 545 try: 546 self._sock = self._ssl_context.wrap_socket( 547 self._sock, 548 server_side=False, 549 do_handshake_on_connect=False, 550 suppress_ragged_eofs=False, # False = error on incoming EOF 551 server_hostname=self._server_hostname) 552 except Exception as error: # pylint: disable=W0703 553 _LOGGER.exception('SSL wrap_socket(%s) failed: %r', self._sock, 554 error) 555 self._report_completion(error) 556 return 557 558 self._do_ssl_handshake() 559 560 @_log_exceptions 561 def _linkup(self): 562 """Connection is ready: instantiate and link up transport and protocol, 563 and invoke user's completion callback. 564 565 """ 566 _LOGGER.debug('_AsyncStreamConnector._linkup()') 567 568 transport = None 569 570 try: 571 # Create the protocol 572 try: 573 protocol = self._protocol_factory() 574 except Exception as error: 575 _LOGGER.exception('protocol_factory() failed: error=%r; %s', 576 error, self._sock) 577 raise 578 579 if self._ssl_context is None: 580 # Create plaintext streaming transport 581 try: 582 transport = _AsyncPlaintextTransport( 583 self._sock, protocol, self._nbio) 584 except Exception as error: 585 _LOGGER.exception('PlainTransport() failed: error=%r; %s', 586 error, self._sock) 587 raise 588 else: 589 # Create SSL streaming transport 590 try: 591 transport = _AsyncSSLTransport(self._sock, protocol, 592 self._nbio) 593 except Exception as error: 594 _LOGGER.exception('SSLTransport() failed: error=%r; %s', 595 error, self._sock) 596 raise 597 598 _LOGGER.debug('_linkup(): created transport %r', transport) 599 600 # Acquaint protocol with its transport 601 try: 602 protocol.connection_made(transport) 603 except Exception as error: 604 _LOGGER.exception( 605 'protocol.connection_made(%r) failed: error=%r; %s', 606 transport, error, self._sock) 607 raise 608 609 _LOGGER.debug('_linkup(): introduced transport to protocol %r; %r', 610 transport, protocol) 611 except Exception as error: # pylint: disable=W0703 612 result = error 613 else: 614 result = (transport, protocol) 615 616 self._report_completion(result) 617 618 @_log_exceptions 619 def _do_ssl_handshake(self): 620 """Perform asynchronous SSL handshake on the already wrapped socket 621 622 """ 623 _LOGGER.debug('_AsyncStreamConnector._do_ssl_handshake()') 624 625 if self._state != self._STATE_ACTIVE: 626 _LOGGER.debug( 627 '_do_ssl_handshake: Abandoning streaming linkup due ' 628 'to inactive state transition; state=%s; %s; .', self._state, 629 self._sock) 630 return 631 632 done = False 633 634 try: 635 try: 636 self._sock.do_handshake() 637 except ssl.SSLError as error: 638 if error.errno == ssl.SSL_ERROR_WANT_READ: 639 _LOGGER.debug('SSL handshake wants read; %s.', self._sock) 640 self._watching_socket = True 641 self._nbio.set_reader(self._sock.fileno(), 642 self._do_ssl_handshake) 643 self._nbio.remove_writer(self._sock.fileno()) 644 elif error.errno == ssl.SSL_ERROR_WANT_WRITE: 645 _LOGGER.debug('SSL handshake wants write. %s', self._sock) 646 self._watching_socket = True 647 self._nbio.set_writer(self._sock.fileno(), 648 self._do_ssl_handshake) 649 self._nbio.remove_reader(self._sock.fileno()) 650 else: 651 # Outer catch will report it 652 raise 653 else: 654 done = True 655 _LOGGER.info('SSL handshake completed successfully: %s', 656 self._sock) 657 except Exception as error: # pylint: disable=W0703 658 _LOGGER.exception('SSL do_handshake failed: error=%r; %s', error, 659 self._sock) 660 self._report_completion(error) 661 return 662 663 if done: 664 # Suspend I/O and link up transport with protocol 665 _LOGGER.debug( 666 '_do_ssl_handshake: removing watchers ahead of linkup: %s', 667 self._sock) 668 self._nbio.remove_reader(self._sock.fileno()) 669 self._nbio.remove_writer(self._sock.fileno()) 670 # So that our `_cleanup()` won't interfere with the transport's 671 # socket watcher configuration. 672 self._watching_socket = False 673 _LOGGER.debug( 674 '_do_ssl_handshake: pre-linkup removal of watchers is done; %s', 675 self._sock) 676 677 self._linkup() 678 679 680class _AsyncTransportBase( # pylint: disable=W0223 681 AbstractStreamTransport): 682 """Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`. 683 684 """ 685 686 _STATE_ACTIVE = 1 687 _STATE_FAILED = 2 # connection failed 688 _STATE_ABORTED_BY_USER = 3 # cancel() called 689 _STATE_COMPLETED = 4 # done with connection 690 691 _MAX_RECV_BYTES = 4096 # per socket.recv() documentation recommendation 692 693 # Max per consume call to prevent event starvation 694 _MAX_CONSUME_BYTES = 1024 * 100 695 696 class RxEndOfFile(OSError): 697 """We raise this internally when EOF (empty read) is detected on input. 698 699 """ 700 701 def __init__(self): 702 super(_AsyncTransportBase.RxEndOfFile, self).__init__( 703 -1, 'End of input stream (EOF)') 704 705 def __init__(self, sock, protocol, nbio): 706 """ 707 708 :param socket.socket | ssl.SSLSocket sock: connected socket 709 :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol: 710 corresponding protocol in this transport/protocol pairing; the 711 protocol already had its `connection_made()` method called. 712 :param AbstractIOServices | AbstractFileDescriptorServices nbio: 713 714 """ 715 _LOGGER.debug('_AsyncTransportBase.__init__: %s', sock) 716 self._sock = sock 717 self._protocol = protocol 718 self._nbio = nbio 719 720 self._state = self._STATE_ACTIVE 721 self._tx_buffers = collections.deque() 722 self._tx_buffered_byte_count = 0 723 724 def abort(self): 725 """Close connection abruptly without waiting for pending I/O to 726 complete. Will invoke the corresponding protocol's `connection_lost()` 727 method asynchronously (not in context of the abort() call). 728 729 :raises Exception: Exception-based exception on error 730 """ 731 _LOGGER.info('Aborting transport connection: state=%s; %s', self._state, 732 self._sock) 733 734 self._initiate_abort(None) 735 736 def get_protocol(self): 737 """Return the protocol linked to this transport. 738 739 :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol 740 """ 741 return self._protocol 742 743 def get_write_buffer_size(self): 744 """ 745 :returns: Current size of output data buffered by the transport 746 :rtype: int 747 """ 748 return self._tx_buffered_byte_count 749 750 def _buffer_tx_data(self, data): 751 """Buffer the given data until it can be sent asynchronously. 752 753 :param bytes data: 754 :raises ValueError: if called with empty data 755 756 """ 757 if not data: 758 _LOGGER.error('write() called with empty data: state=%s; %s', 759 self._state, self._sock) 760 raise ValueError('write() called with empty data {!r}'.format(data)) 761 762 if self._state != self._STATE_ACTIVE: 763 _LOGGER.debug( 764 'Ignoring write() called during inactive state: ' 765 'state=%s; %s', self._state, self._sock) 766 return 767 768 self._tx_buffers.append(data) 769 self._tx_buffered_byte_count += len(data) 770 771 def _consume(self): 772 """Utility method for use by subclasses to ingest data from socket and 773 dispatch it to protocol's `data_received()` method socket-specific 774 "try again" exception, per-event data consumption limit is reached, 775 transport becomes inactive, or a fatal failure. 776 777 Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or 778 until state becomes inactive (e.g., `protocol.data_received()` callback 779 aborts the transport) 780 781 :raises: Whatever the corresponding `sock.recv()` raises except the 782 socket error with errno.EINTR 783 :raises: Whatever the `protocol.data_received()` callback raises 784 :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream 785 786 """ 787 bytes_consumed = 0 788 789 while (self._state == self._STATE_ACTIVE and 790 bytes_consumed < self._MAX_CONSUME_BYTES): 791 data = self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES) 792 bytes_consumed += len(data) 793 794 # Empty data, should disconnect 795 if not data: 796 _LOGGER.error('Socket EOF; %s', self._sock) 797 raise self.RxEndOfFile() 798 799 # Pass the data to the protocol 800 try: 801 self._protocol.data_received(data) 802 except Exception as error: 803 _LOGGER.exception( 804 'protocol.data_received() failed: error=%r; %s', error, 805 self._sock) 806 raise 807 808 def _produce(self): 809 """Utility method for use by subclasses to emit data from tx_buffers. 810 This method sends chunks from `tx_buffers` until all chunks are 811 exhausted or sending is interrupted by an exception. Maintains integrity 812 of `self.tx_buffers`. 813 814 :raises: whatever the corresponding `sock.send()` raises except the 815 socket error with errno.EINTR 816 817 """ 818 while self._tx_buffers: 819 num_bytes_sent = self._sigint_safe_send(self._sock, 820 self._tx_buffers[0]) 821 822 chunk = self._tx_buffers.popleft() 823 if num_bytes_sent < len(chunk): 824 _LOGGER.debug('Partial send, requeing remaining data; %s of %s', 825 num_bytes_sent, len(chunk)) 826 self._tx_buffers.appendleft(chunk[num_bytes_sent:]) 827 828 self._tx_buffered_byte_count -= num_bytes_sent 829 assert self._tx_buffered_byte_count >= 0, ( 830 '_AsyncTransportBase._produce() tx buffer size underflow', 831 self._tx_buffered_byte_count, self._state) 832 833 @staticmethod 834 @_retry_on_sigint 835 def _sigint_safe_recv(sock, max_bytes): 836 """Receive data from socket, retrying on SIGINT. 837 838 :param sock: stream or SSL socket 839 :param max_bytes: maximum number of bytes to receive 840 :returns: received data or empty bytes uppon end of file 841 :rtype: bytes 842 :raises: whatever the corresponding `sock.recv()` raises except socket 843 error with errno.EINTR 844 845 """ 846 return sock.recv(max_bytes) 847 848 @staticmethod 849 @_retry_on_sigint 850 def _sigint_safe_send(sock, data): 851 """Send data to socket, retrying on SIGINT. 852 853 :param sock: stream or SSL socket 854 :param data: data bytes to send 855 :returns: number of bytes actually sent 856 :rtype: int 857 :raises: whatever the corresponding `sock.send()` raises except socket 858 error with errno.EINTR 859 860 """ 861 return sock.send(data) 862 863 @_log_exceptions 864 def _deactivate(self): 865 """Unregister the transport from I/O events 866 867 """ 868 if self._state == self._STATE_ACTIVE: 869 _LOGGER.info('Deactivating transport: state=%s; %s', self._state, 870 self._sock) 871 self._nbio.remove_reader(self._sock.fileno()) 872 self._nbio.remove_writer(self._sock.fileno()) 873 self._tx_buffers.clear() 874 875 @_log_exceptions 876 def _close_and_finalize(self): 877 """Close the transport's socket and unlink the transport it from 878 references to other assets (protocol, etc.) 879 880 """ 881 if self._state != self._STATE_COMPLETED: 882 _LOGGER.info('Closing transport socket and unlinking: state=%s; %s', 883 self._state, self._sock) 884 try: 885 self._sock.shutdown(socket.SHUT_RDWR) 886 except pika.compat.SOCKET_ERROR: 887 pass 888 self._sock.close() 889 self._sock = None 890 self._protocol = None 891 self._nbio = None 892 self._state = self._STATE_COMPLETED 893 894 @_log_exceptions 895 def _initiate_abort(self, error): 896 """Initiate asynchronous abort of the transport that concludes with a 897 call to the protocol's `connection_lost()` method. No flushing of 898 output buffers will take place. 899 900 :param BaseException | None error: None if being canceled by user, 901 including via falsie return value from protocol.eof_received; 902 otherwise the exception corresponding to the the failed connection. 903 """ 904 _LOGGER.info( 905 '_AsyncTransportBase._initate_abort(): Initiating abrupt ' 906 'asynchronous transport shutdown: state=%s; error=%r; %s', 907 self._state, error, self._sock) 908 909 assert self._state != self._STATE_COMPLETED, ( 910 '_AsyncTransportBase._initate_abort() expected ' 911 'non-_STATE_COMPLETED', self._state) 912 913 if self._state == self._STATE_COMPLETED: 914 return 915 916 self._deactivate() 917 918 # Update state 919 if error is None: 920 # Being aborted by user 921 922 if self._state == self._STATE_ABORTED_BY_USER: 923 # Abort by user already pending 924 _LOGGER.debug('_AsyncTransportBase._initiate_abort(): ' 925 'ignoring - user-abort already pending.') 926 return 927 928 # Notification priority is given to user-initiated abort over 929 # failed connection 930 self._state = self._STATE_ABORTED_BY_USER 931 else: 932 # Connection failed 933 934 if self._state != self._STATE_ACTIVE: 935 assert self._state == self._STATE_ABORTED_BY_USER, ( 936 '_AsyncTransportBase._initate_abort() expected ' 937 '_STATE_ABORTED_BY_USER', self._state) 938 return 939 940 self._state = self._STATE_FAILED 941 942 # Schedule callback from I/O loop to avoid potential reentry into user 943 # code 944 self._nbio.add_callback_threadsafe( 945 functools.partial(self._connection_lost_notify_async, error)) 946 947 @_log_exceptions 948 def _connection_lost_notify_async(self, error): 949 """Handle aborting of transport either due to socket error or user- 950 initiated `abort()` call. Must be called from an I/O loop callback owned 951 by us in order to avoid reentry into user code from user's API call into 952 the transport. 953 954 :param BaseException | None error: None if being canceled by user; 955 otherwise the exception corresponding to the the failed connection. 956 """ 957 _LOGGER.debug('Concluding transport shutdown: state=%s; error=%r', 958 self._state, error) 959 960 if self._state == self._STATE_COMPLETED: 961 return 962 963 if error is not None and self._state != self._STATE_FAILED: 964 # Priority is given to user-initiated abort notification 965 assert self._state == self._STATE_ABORTED_BY_USER, ( 966 '_AsyncTransportBase._connection_lost_notify_async() ' 967 'expected _STATE_ABORTED_BY_USER', self._state) 968 return 969 970 # Inform protocol 971 try: 972 self._protocol.connection_lost(error) 973 except Exception as exc: # pylint: disable=W0703 974 _LOGGER.exception('protocol.connection_lost(%r) failed: exc=%r; %s', 975 error, exc, self._sock) 976 # Re-raise, since we've exhausted our normal failure notification 977 # mechanism (i.e., connection_lost()) 978 raise 979 finally: 980 self._close_and_finalize() 981 982 983class _AsyncPlaintextTransport(_AsyncTransportBase): 984 """Implementation of `nbio_interface.AbstractStreamTransport` for a 985 plaintext connection. 986 987 """ 988 989 def __init__(self, sock, protocol, nbio): 990 """ 991 992 :param socket.socket sock: non-blocking connected socket 993 :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol: 994 corresponding protocol in this transport/protocol pairing; the 995 protocol already had its `connection_made()` method called. 996 :param AbstractIOServices | AbstractFileDescriptorServices nbio: 997 998 """ 999 super(_AsyncPlaintextTransport, self).__init__(sock, protocol, nbio) 1000 1001 # Request to be notified of incoming data; we'll watch for writability 1002 # only when our write buffer is non-empty 1003 self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) 1004 1005 def write(self, data): 1006 """Buffer the given data until it can be sent asynchronously. 1007 1008 :param bytes data: 1009 :raises ValueError: if called with empty data 1010 1011 """ 1012 if self._state != self._STATE_ACTIVE: 1013 _LOGGER.debug( 1014 'Ignoring write() called during inactive state: ' 1015 'state=%s; %s', self._state, self._sock) 1016 return 1017 1018 assert data, ('_AsyncPlaintextTransport.write(): empty data from user.', 1019 data, self._state) 1020 1021 if not self.get_write_buffer_size(): 1022 self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) 1023 _LOGGER.debug('Turned on writability watcher: %s', self._sock) 1024 1025 self._buffer_tx_data(data) 1026 1027 @_log_exceptions 1028 def _on_socket_readable(self): 1029 """Ingest data from socket and dispatch it to protocol until exception 1030 occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption 1031 limit is reached, transport becomes inactive, or failure. 1032 1033 """ 1034 if self._state != self._STATE_ACTIVE: 1035 _LOGGER.debug( 1036 'Ignoring readability notification due to inactive ' 1037 'state: state=%s; %s', self._state, self._sock) 1038 return 1039 1040 try: 1041 self._consume() 1042 except self.RxEndOfFile: 1043 try: 1044 keep_open = self._protocol.eof_received() 1045 except Exception as error: # pylint: disable=W0703 1046 _LOGGER.exception( 1047 'protocol.eof_received() failed: error=%r; %s', error, 1048 self._sock) 1049 self._initiate_abort(error) 1050 else: 1051 if keep_open: 1052 _LOGGER.info( 1053 'protocol.eof_received() elected to keep open: %s', 1054 self._sock) 1055 self._nbio.remove_reader(self._sock.fileno()) 1056 else: 1057 _LOGGER.info('protocol.eof_received() elected to close: %s', 1058 self._sock) 1059 self._initiate_abort(None) 1060 except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703 1061 if (isinstance(error, pika.compat.SOCKET_ERROR) and 1062 error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES): 1063 _LOGGER.debug('Recv would block on %s', self._sock) 1064 else: 1065 _LOGGER.exception( 1066 '_AsyncBaseTransport._consume() failed, aborting ' 1067 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', 1068 error, self._sock, ''.join( 1069 traceback.format_exception(*sys.exc_info()))) 1070 self._initiate_abort(error) 1071 else: 1072 if self._state != self._STATE_ACTIVE: 1073 # Most likely our protocol's `data_received()` aborted the 1074 # transport 1075 _LOGGER.debug( 1076 'Leaving Plaintext consumer due to inactive ' 1077 'state: state=%s; %s', self._state, self._sock) 1078 1079 @_log_exceptions 1080 def _on_socket_writable(self): 1081 """Handle writable socket notification 1082 1083 """ 1084 if self._state != self._STATE_ACTIVE: 1085 _LOGGER.debug( 1086 'Ignoring writability notification due to inactive ' 1087 'state: state=%s; %s', self._state, self._sock) 1088 return 1089 1090 # We shouldn't be getting called with empty tx buffers 1091 assert self._tx_buffers, ( 1092 '_AsyncPlaintextTransport._on_socket_writable() called, ' 1093 'but _tx_buffers is empty.', self._state) 1094 1095 try: 1096 # Transmit buffered data to remote socket 1097 self._produce() 1098 except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703 1099 if (isinstance(error, pika.compat.SOCKET_ERROR) and 1100 error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES): 1101 _LOGGER.debug('Send would block on %s', self._sock) 1102 else: 1103 _LOGGER.exception( 1104 '_AsyncBaseTransport._produce() failed, aborting ' 1105 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', 1106 error, self._sock, ''.join( 1107 traceback.format_exception(*sys.exc_info()))) 1108 self._initiate_abort(error) 1109 else: 1110 if not self._tx_buffers: 1111 self._nbio.remove_writer(self._sock.fileno()) 1112 _LOGGER.debug('Turned off writability watcher: %s', self._sock) 1113 1114 1115class _AsyncSSLTransport(_AsyncTransportBase): 1116 """Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL 1117 connection. 1118 1119 """ 1120 1121 def __init__(self, sock, protocol, nbio): 1122 """ 1123 1124 :param ssl.SSLSocket sock: non-blocking connected socket 1125 :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol: 1126 corresponding protocol in this transport/protocol pairing; the 1127 protocol already had its `connection_made()` method called. 1128 :param AbstractIOServices | AbstractFileDescriptorServices nbio: 1129 1130 """ 1131 super(_AsyncSSLTransport, self).__init__(sock, protocol, nbio) 1132 1133 self._ssl_readable_action = self._consume 1134 self._ssl_writable_action = None 1135 1136 # Bootstrap consumer; we'll take care of producer once data is buffered 1137 self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) 1138 # Try reading asap just in case read-ahead caused some 1139 self._nbio.add_callback_threadsafe(self._on_socket_readable) 1140 1141 def write(self, data): 1142 """Buffer the given data until it can be sent asynchronously. 1143 1144 :param bytes data: 1145 :raises ValueError: if called with empty data 1146 1147 """ 1148 if self._state != self._STATE_ACTIVE: 1149 _LOGGER.debug( 1150 'Ignoring write() called during inactive state: ' 1151 'state=%s; %s', self._state, self._sock) 1152 return 1153 1154 tx_buffer_was_empty = self.get_write_buffer_size() == 0 1155 1156 self._buffer_tx_data(data) 1157 1158 if tx_buffer_was_empty and self._ssl_writable_action is None: 1159 self._ssl_writable_action = self._produce 1160 self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) 1161 _LOGGER.debug('Turned on writability watcher: %s', self._sock) 1162 1163 @_log_exceptions 1164 def _on_socket_readable(self): 1165 """Handle readable socket indication 1166 1167 """ 1168 if self._state != self._STATE_ACTIVE: 1169 _LOGGER.debug( 1170 'Ignoring readability notification due to inactive ' 1171 'state: state=%s; %s', self._state, self._sock) 1172 return 1173 1174 if self._ssl_readable_action: 1175 try: 1176 self._ssl_readable_action() 1177 except Exception as error: # pylint: disable=W0703 1178 self._initiate_abort(error) 1179 else: 1180 _LOGGER.debug( 1181 'SSL readable action was suppressed: ' 1182 'ssl_writable_action=%r; %s', self._ssl_writable_action, 1183 self._sock) 1184 1185 @_log_exceptions 1186 def _on_socket_writable(self): 1187 """Handle writable socket notification 1188 1189 """ 1190 if self._state != self._STATE_ACTIVE: 1191 _LOGGER.debug( 1192 'Ignoring writability notification due to inactive ' 1193 'state: state=%s; %s', self._state, self._sock) 1194 return 1195 1196 if self._ssl_writable_action: 1197 try: 1198 self._ssl_writable_action() 1199 except Exception as error: # pylint: disable=W0703 1200 self._initiate_abort(error) 1201 else: 1202 _LOGGER.debug( 1203 'SSL writable action was suppressed: ' 1204 'ssl_readable_action=%r; %s', self._ssl_readable_action, 1205 self._sock) 1206 1207 @_log_exceptions 1208 def _consume(self): 1209 """[override] Ingest data from socket and dispatch it to protocol until 1210 exception occurs (typically ssl.SSLError with 1211 SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached, 1212 transport becomes inactive, or failure. 1213 1214 Update consumer/producer registration. 1215 1216 :raises Exception: error that signals that connection needs to be 1217 aborted 1218 """ 1219 next_consume_on_readable = True 1220 1221 try: 1222 super(_AsyncSSLTransport, self)._consume() 1223 except ssl.SSLError as error: 1224 if error.errno == ssl.SSL_ERROR_WANT_READ: 1225 _LOGGER.debug('SSL ingester wants read: %s', self._sock) 1226 elif error.errno == ssl.SSL_ERROR_WANT_WRITE: 1227 # Looks like SSL re-negotiation 1228 _LOGGER.debug('SSL ingester wants write: %s', self._sock) 1229 next_consume_on_readable = False 1230 else: 1231 _LOGGER.exception( 1232 '_AsyncBaseTransport._consume() failed, aborting ' 1233 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', 1234 error, self._sock, ''.join( 1235 traceback.format_exception(*sys.exc_info()))) 1236 raise # let outer catch block abort the transport 1237 else: 1238 if self._state != self._STATE_ACTIVE: 1239 # Most likely our protocol's `data_received()` aborted the 1240 # transport 1241 _LOGGER.debug( 1242 'Leaving SSL consumer due to inactive ' 1243 'state: state=%s; %s', self._state, self._sock) 1244 return 1245 1246 # Consumer exited without exception; there may still be more, 1247 # possibly unprocessed, data records in SSL input buffers that 1248 # can be read without waiting for socket to become readable. 1249 1250 # In case buffered input SSL data records still remain 1251 self._nbio.add_callback_threadsafe(self._on_socket_readable) 1252 1253 # Update consumer registration 1254 if next_consume_on_readable: 1255 if not self._ssl_readable_action: 1256 self._nbio.set_reader(self._sock.fileno(), 1257 self._on_socket_readable) 1258 self._ssl_readable_action = self._consume 1259 1260 # NOTE: can't use identity check, it fails for instance methods 1261 if self._ssl_writable_action == self._consume: # pylint: disable=W0143 1262 self._nbio.remove_writer(self._sock.fileno()) 1263 self._ssl_writable_action = None 1264 else: 1265 # WANT_WRITE 1266 if not self._ssl_writable_action: 1267 self._nbio.set_writer(self._sock.fileno(), 1268 self._on_socket_writable) 1269 self._ssl_writable_action = self._consume 1270 1271 if self._ssl_readable_action: 1272 self._nbio.remove_reader(self._sock.fileno()) 1273 self._ssl_readable_action = None 1274 1275 # Update producer registration 1276 if self._tx_buffers and not self._ssl_writable_action: 1277 self._ssl_writable_action = self._produce 1278 self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable) 1279 1280 @_log_exceptions 1281 def _produce(self): 1282 """[override] Emit data from tx_buffers all chunks are exhausted or 1283 sending is interrupted by an exception (typically ssl.SSLError with 1284 SSL_ERROR_WANT_READ/WRITE). 1285 1286 Update consumer/producer registration. 1287 1288 :raises Exception: error that signals that connection needs to be 1289 aborted 1290 1291 """ 1292 next_produce_on_writable = None # None means no need to produce 1293 1294 try: 1295 super(_AsyncSSLTransport, self)._produce() 1296 except ssl.SSLError as error: 1297 if error.errno == ssl.SSL_ERROR_WANT_READ: 1298 # Looks like SSL re-negotiation 1299 _LOGGER.debug('SSL emitter wants read: %s', self._sock) 1300 next_produce_on_writable = False 1301 elif error.errno == ssl.SSL_ERROR_WANT_WRITE: 1302 _LOGGER.debug('SSL emitter wants write: %s', self._sock) 1303 next_produce_on_writable = True 1304 else: 1305 _LOGGER.exception( 1306 '_AsyncBaseTransport._produce() failed, aborting ' 1307 'connection: error=%r; sock=%s; Caller\'s stack:\n%s', 1308 error, self._sock, ''.join( 1309 traceback.format_exception(*sys.exc_info()))) 1310 raise # let outer catch block abort the transport 1311 else: 1312 # No exception, so everything must have been written to the socket 1313 assert not self._tx_buffers, ( 1314 '_AsyncSSLTransport._produce(): no exception from parent ' 1315 'class, but data remains in _tx_buffers.', len( 1316 self._tx_buffers)) 1317 1318 # Update producer registration 1319 if self._tx_buffers: 1320 assert next_produce_on_writable is not None, ( 1321 '_AsyncSSLTransport._produce(): next_produce_on_writable is ' 1322 'still None', self._state) 1323 1324 if next_produce_on_writable: 1325 if not self._ssl_writable_action: 1326 self._nbio.set_writer(self._sock.fileno(), 1327 self._on_socket_writable) 1328 self._ssl_writable_action = self._produce 1329 1330 # NOTE: can't use identity check, it fails for instance methods 1331 if self._ssl_readable_action == self._produce: # pylint: disable=W0143 1332 self._nbio.remove_reader(self._sock.fileno()) 1333 self._ssl_readable_action = None 1334 else: 1335 # WANT_READ 1336 if not self._ssl_readable_action: 1337 self._nbio.set_reader(self._sock.fileno(), 1338 self._on_socket_readable) 1339 self._ssl_readable_action = self._produce 1340 1341 if self._ssl_writable_action: 1342 self._nbio.remove_writer(self._sock.fileno()) 1343 self._ssl_writable_action = None 1344 else: 1345 # NOTE: can't use identity check, it fails for instance methods 1346 if self._ssl_readable_action == self._produce: # pylint: disable=W0143 1347 self._nbio.remove_reader(self._sock.fileno()) 1348 self._ssl_readable_action = None 1349 assert self._ssl_writable_action != self._produce, ( # pylint: disable=W0143 1350 '_AsyncSSLTransport._produce(): with empty tx_buffers, ' 1351 'writable_action cannot be _produce when readable is ' 1352 '_produce', self._state) 1353 else: 1354 # NOTE: can't use identity check, it fails for instance methods 1355 assert self._ssl_writable_action == self._produce, ( # pylint: disable=W0143 1356 '_AsyncSSLTransport._produce(): with empty tx_buffers, ' 1357 'expected writable_action as _produce when readable_action ' 1358 'is not _produce', 'writable_action:', 1359 self._ssl_writable_action, 'readable_action:', 1360 self._ssl_readable_action, 'state:', self._state) 1361 self._ssl_writable_action = None 1362 self._nbio.remove_writer(self._sock.fileno()) 1363 1364 # Update consumer registration 1365 if not self._ssl_readable_action: 1366 self._ssl_readable_action = self._consume 1367 self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable) 1368 # In case input SSL data records have been buffered 1369 self._nbio.add_callback_threadsafe(self._on_socket_readable) 1370 elif self._sock.pending(): 1371 self._nbio.add_callback_threadsafe(self._on_socket_readable) 1372