1""" 2Implementation of `nbio_interface.AbstractIOServices` on top of a 3selector-based I/O loop, such as tornado's and our home-grown 4select_connection's I/O loops. 5 6""" 7import abc 8import logging 9import socket 10import threading 11 12from pika.adapters.utils import nbio_interface, io_services_utils 13from pika.adapters.utils.io_services_utils import (check_callback_arg, 14 check_fd_arg) 15 16LOGGER = logging.getLogger(__name__) 17 18 19class AbstractSelectorIOLoop(object): 20 """Selector-based I/O loop interface expected by 21 `selector_ioloop_adapter.SelectorIOServicesAdapter` 22 23 NOTE: this interface follows the corresponding methods and attributes 24 of `tornado.ioloop.IOLoop` in order to avoid additional adapter layering 25 when wrapping tornado's IOLoop. 26 """ 27 28 @property 29 @abc.abstractmethod 30 def READ(self): # pylint: disable=C0103 31 """The value of the I/O loop's READ flag; READ/WRITE/ERROR may be used 32 with bitwise operators as expected. 33 34 Implementation note: the implementations can simply replace these 35 READ/WRITE/ERROR properties with class-level attributes 36 37 """ 38 39 @property 40 @abc.abstractmethod 41 def WRITE(self): # pylint: disable=C0103 42 """The value of the I/O loop's WRITE flag; READ/WRITE/ERROR may be used 43 with bitwise operators as expected 44 45 """ 46 47 @property 48 @abc.abstractmethod 49 def ERROR(self): # pylint: disable=C0103 50 """The value of the I/O loop's ERROR flag; READ/WRITE/ERROR may be used 51 with bitwise operators as expected 52 53 """ 54 55 @abc.abstractmethod 56 def close(self): 57 """Release IOLoop's resources. 58 59 the `close()` method is intended to be called by the application or test 60 code only after `start()` returns. After calling `close()`, no other 61 interaction with the closed instance of `IOLoop` should be performed. 62 63 """ 64 65 @abc.abstractmethod 66 def start(self): 67 """Run the I/O loop. It will loop until requested to exit. See `stop()`. 68 69 """ 70 71 @abc.abstractmethod 72 def stop(self): 73 """Request exit from the ioloop. The loop is NOT guaranteed to 74 stop before this method returns. 75 76 To invoke `stop()` safely from a thread other than this IOLoop's thread, 77 call it via `add_callback_threadsafe`; e.g., 78 79 `ioloop.add_callback(ioloop.stop)` 80 81 """ 82 83 @abc.abstractmethod 84 def call_later(self, delay, callback): 85 """Add the callback to the IOLoop timer to be called after delay seconds 86 from the time of call on best-effort basis. Returns a handle to the 87 timeout. 88 89 :param float delay: The number of seconds to wait to call callback 90 :param callable callback: The callback method 91 :returns: handle to the created timeout that may be passed to 92 `remove_timeout()` 93 :rtype: object 94 95 """ 96 97 @abc.abstractmethod 98 def remove_timeout(self, timeout_handle): 99 """Remove a timeout 100 101 :param timeout_handle: Handle of timeout to remove 102 103 """ 104 105 @abc.abstractmethod 106 def add_callback(self, callback): 107 """Requests a call to the given function as soon as possible in the 108 context of this IOLoop's thread. 109 110 NOTE: This is the only thread-safe method in IOLoop. All other 111 manipulations of IOLoop must be performed from the IOLoop's thread. 112 113 For example, a thread may request a call to the `stop` method of an 114 ioloop that is running in a different thread via 115 `ioloop.add_callback_threadsafe(ioloop.stop)` 116 117 :param callable callback: The callback method 118 119 """ 120 121 @abc.abstractmethod 122 def add_handler(self, fd, handler, events): 123 """Start watching the given file descriptor for events 124 125 :param int fd: The file descriptor 126 :param callable handler: When requested event(s) occur, 127 `handler(fd, events)` will be called. 128 :param int events: The event mask using READ, WRITE, ERROR. 129 130 """ 131 132 @abc.abstractmethod 133 def update_handler(self, fd, events): 134 """Changes the events we watch for 135 136 :param int fd: The file descriptor 137 :param int events: The event mask using READ, WRITE, ERROR 138 139 """ 140 141 @abc.abstractmethod 142 def remove_handler(self, fd): 143 """Stop watching the given file descriptor for events 144 145 :param int fd: The file descriptor 146 147 """ 148 149 150class SelectorIOServicesAdapter(io_services_utils.SocketConnectionMixin, 151 io_services_utils.StreamingConnectionMixin, 152 nbio_interface.AbstractIOServices, 153 nbio_interface.AbstractFileDescriptorServices): 154 """Implements the 155 :py:class:`.nbio_interface.AbstractIOServices` interface 156 on top of selector-style native loop having the 157 :py:class:`AbstractSelectorIOLoop` interface, such as 158 :py:class:`pika.selection_connection.IOLoop` and :py:class:`tornado.IOLoop`. 159 160 NOTE: 161 :py:class:`.nbio_interface.AbstractFileDescriptorServices` 162 interface is only required by the mixins. 163 164 """ 165 166 def __init__(self, native_loop): 167 """ 168 :param AbstractSelectorIOLoop native_loop: An instance compatible with 169 the `AbstractSelectorIOLoop` interface, but not necessarily derived 170 from it. 171 """ 172 self._loop = native_loop 173 174 # Active watchers: maps file descriptors to `_FileDescriptorCallbacks` 175 self._watchers = dict() 176 177 # Native loop-specific event masks of interest 178 self._readable_mask = self._loop.READ 179 # NOTE: tying ERROR to WRITE is particularly handy for Windows, whose 180 # `select.select()` differs from Posix by reporting 181 # connection-establishment failure only through exceptfds (ERROR event), 182 # while the typical application workflow is to wait for the socket to 183 # become writable when waiting for socket connection to be established. 184 self._writable_mask = self._loop.WRITE | self._loop.ERROR 185 186 def get_native_ioloop(self): 187 """Implement 188 :py:meth:`.nbio_interface.AbstractIOServices.get_native_ioloop()`. 189 190 """ 191 return self._loop 192 193 def close(self): 194 """Implement :py:meth:`.nbio_interface.AbstractIOServices.close()`. 195 196 """ 197 self._loop.close() 198 199 def run(self): 200 """Implement :py:meth:`.nbio_interface.AbstractIOServices.run()`. 201 202 """ 203 self._loop.start() 204 205 def stop(self): 206 """Implement :py:meth:`.nbio_interface.AbstractIOServices.stop()`. 207 208 """ 209 self._loop.stop() 210 211 def add_callback_threadsafe(self, callback): 212 """Implement 213 :py:meth:`.nbio_interface.AbstractIOServices.add_callback_threadsafe()`. 214 215 """ 216 self._loop.add_callback(callback) 217 218 def call_later(self, delay, callback): 219 """Implement :py:meth:`.nbio_interface.AbstractIOServices.call_later()`. 220 221 """ 222 return _TimerHandle(self._loop.call_later(delay, callback), self._loop) 223 224 def getaddrinfo(self, 225 host, 226 port, 227 on_done, 228 family=0, 229 socktype=0, 230 proto=0, 231 flags=0): 232 """Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`. 233 234 """ 235 return _SelectorIOLoopIOHandle( 236 _AddressResolver( 237 native_loop=self._loop, 238 host=host, 239 port=port, 240 family=family, 241 socktype=socktype, 242 proto=proto, 243 flags=flags, 244 on_done=on_done).start()) 245 246 def set_reader(self, fd, on_readable): 247 """Implement 248 :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_reader()`. 249 250 """ 251 LOGGER.debug('SelectorIOServicesAdapter.set_reader(%s, %r)', fd, 252 on_readable) 253 254 check_fd_arg(fd) 255 check_callback_arg(on_readable, 'on_readable') 256 257 try: 258 callbacks = self._watchers[fd] 259 except KeyError: 260 self._loop.add_handler(fd, self._on_reader_writer_fd_events, 261 self._readable_mask) 262 self._watchers[fd] = _FileDescriptorCallbacks(reader=on_readable) 263 LOGGER.debug('set_reader(%s, _) added handler Rd', fd) 264 else: 265 if callbacks.reader is None: 266 assert callbacks.writer is not None 267 self._loop.update_handler( 268 fd, self._readable_mask | self._writable_mask) 269 LOGGER.debug('set_reader(%s, _) updated handler RdWr', fd) 270 else: 271 LOGGER.debug('set_reader(%s, _) replacing reader', fd) 272 273 callbacks.reader = on_readable 274 275 def remove_reader(self, fd): 276 """Implement 277 :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_reader()`. 278 279 """ 280 LOGGER.debug('SelectorIOServicesAdapter.remove_reader(%s)', fd) 281 282 check_fd_arg(fd) 283 284 try: 285 callbacks = self._watchers[fd] 286 except KeyError: 287 LOGGER.debug('remove_reader(%s) neither was set', fd) 288 return False 289 290 if callbacks.reader is None: 291 assert callbacks.writer is not None 292 LOGGER.debug('remove_reader(%s) reader wasn\'t set Wr', fd) 293 return False 294 295 callbacks.reader = None 296 297 if callbacks.writer is None: 298 del self._watchers[fd] 299 self._loop.remove_handler(fd) 300 LOGGER.debug('remove_reader(%s) removed handler', fd) 301 else: 302 self._loop.update_handler(fd, self._writable_mask) 303 LOGGER.debug('remove_reader(%s) updated handler Wr', fd) 304 305 return True 306 307 def set_writer(self, fd, on_writable): 308 """Implement 309 :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_writer()`. 310 311 """ 312 LOGGER.debug('SelectorIOServicesAdapter.set_writer(%s, %r)', fd, 313 on_writable) 314 315 check_fd_arg(fd) 316 check_callback_arg(on_writable, 'on_writable') 317 318 try: 319 callbacks = self._watchers[fd] 320 except KeyError: 321 self._loop.add_handler(fd, self._on_reader_writer_fd_events, 322 self._writable_mask) 323 self._watchers[fd] = _FileDescriptorCallbacks(writer=on_writable) 324 LOGGER.debug('set_writer(%s, _) added handler Wr', fd) 325 else: 326 if callbacks.writer is None: 327 assert callbacks.reader is not None 328 self._loop.update_handler( 329 fd, self._readable_mask | self._writable_mask) 330 LOGGER.debug('set_writer(%s, _) updated handler RdWr', fd) 331 else: 332 LOGGER.debug('set_writer(%s, _) replacing writer', fd) 333 334 callbacks.writer = on_writable 335 336 def remove_writer(self, fd): 337 """Implement 338 :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_writer()`. 339 340 """ 341 LOGGER.debug('SelectorIOServicesAdapter.remove_writer(%s)', fd) 342 343 check_fd_arg(fd) 344 345 try: 346 callbacks = self._watchers[fd] 347 except KeyError: 348 LOGGER.debug('remove_writer(%s) neither was set.', fd) 349 return False 350 351 if callbacks.writer is None: 352 assert callbacks.reader is not None 353 LOGGER.debug('remove_writer(%s) writer wasn\'t set Rd', fd) 354 return False 355 356 callbacks.writer = None 357 358 if callbacks.reader is None: 359 del self._watchers[fd] 360 self._loop.remove_handler(fd) 361 LOGGER.debug('remove_writer(%s) removed handler', fd) 362 else: 363 self._loop.update_handler(fd, self._readable_mask) 364 LOGGER.debug('remove_writer(%s) updated handler Rd', fd) 365 366 return True 367 368 def _on_reader_writer_fd_events(self, fd, events): 369 """Handle indicated file descriptor events requested via `set_reader()` 370 and `set_writer()`. 371 372 :param fd: file descriptor 373 :param events: event mask using native loop's READ/WRITE/ERROR. NOTE: 374 depending on the underlying poller mechanism, ERROR may be indicated 375 upon certain file description state even though we don't request it. 376 We ignore ERROR here since `set_reader()`/`set_writer()` don't 377 request for it. 378 """ 379 callbacks = self._watchers[fd] 380 381 if events & self._readable_mask and callbacks.reader is None: 382 # NOTE: we check for consistency here ahead of the writer callback 383 # because the writer callback, if any, can change the events being 384 # watched 385 LOGGER.warning( 386 'READ indicated on fd=%s, but reader callback is None; ' 387 'events=%s', fd, bin(events)) 388 389 if events & self._writable_mask: 390 if callbacks.writer is not None: 391 callbacks.writer() 392 else: 393 LOGGER.warning( 394 'WRITE indicated on fd=%s, but writer callback is None; ' 395 'events=%s', fd, bin(events)) 396 397 if events & self._readable_mask: 398 if callbacks.reader is not None: 399 callbacks.reader() 400 else: 401 # Reader callback might have been removed in the scope of writer 402 # callback. 403 pass 404 405 406class _FileDescriptorCallbacks(object): 407 """Holds reader and writer callbacks for a file descriptor""" 408 409 __slots__ = ('reader', 'writer') 410 411 def __init__(self, reader=None, writer=None): 412 413 self.reader = reader 414 self.writer = writer 415 416 417class _TimerHandle(nbio_interface.AbstractTimerReference): 418 """This module's adaptation of `nbio_interface.AbstractTimerReference`. 419 420 """ 421 422 def __init__(self, handle, loop): 423 """ 424 425 :param opaque handle: timer handle from the underlying loop 426 implementation that may be passed to its `remove_timeout()` method 427 :param AbstractSelectorIOLoop loop: the I/O loop instance that created 428 the timeout. 429 """ 430 self._handle = handle 431 self._loop = loop 432 433 def cancel(self): 434 if self._loop is not None: 435 self._loop.remove_timeout(self._handle) 436 self._handle = None 437 self._loop = None 438 439 440class _SelectorIOLoopIOHandle(nbio_interface.AbstractIOReference): 441 """This module's adaptation of `nbio_interface.AbstractIOReference` 442 443 """ 444 445 def __init__(self, subject): 446 """ 447 :param subject: subject of the reference containing a `cancel()` method 448 449 """ 450 self._cancel = subject.cancel 451 452 def cancel(self): 453 """Cancel pending operation 454 455 :returns: False if was already done or cancelled; True otherwise 456 :rtype: bool 457 458 """ 459 return self._cancel() 460 461 462class _AddressResolver(object): 463 """Performs getaddrinfo asynchronously using a thread, then reports result 464 via callback from the given I/O loop. 465 466 NOTE: at this stage, we're using a thread per request, which may prove 467 inefficient and even prohibitive if the app performs many of these 468 operations concurrently. 469 """ 470 NOT_STARTED = 0 471 ACTIVE = 1 472 CANCELED = 2 473 COMPLETED = 3 474 475 def __init__(self, native_loop, host, port, family, socktype, proto, flags, 476 on_done): 477 """ 478 479 :param AbstractSelectorIOLoop native_loop: 480 :param host: `see socket.getaddrinfo()` 481 :param port: `see socket.getaddrinfo()` 482 :param family: `see socket.getaddrinfo()` 483 :param socktype: `see socket.getaddrinfo()` 484 :param proto: `see socket.getaddrinfo()` 485 :param flags: `see socket.getaddrinfo()` 486 :param on_done: on_done(records|BaseException) callback for reporting 487 result from the given I/O loop. The single arg will be either an 488 exception object (check for `BaseException`) in case of failure or 489 the result returned by `socket.getaddrinfo()`. 490 """ 491 check_callback_arg(on_done, 'on_done') 492 493 self._state = self.NOT_STARTED 494 self._result = None 495 self._loop = native_loop 496 self._host = host 497 self._port = port 498 self._family = family 499 self._socktype = socktype 500 self._proto = proto 501 self._flags = flags 502 self._on_done = on_done 503 504 self._mutex = threading.Lock() 505 self._threading_timer = None 506 507 def _cleanup(self): 508 """Release resources 509 510 """ 511 self._loop = None 512 self._threading_timer = None 513 self._on_done = None 514 515 def start(self): 516 """Start asynchronous DNS lookup. 517 518 :rtype: nbio_interface.AbstractIOReference 519 520 """ 521 assert self._state == self.NOT_STARTED, self._state 522 523 self._state = self.ACTIVE 524 self._threading_timer = threading.Timer(0, self._resolve) 525 self._threading_timer.start() 526 527 return _SelectorIOLoopIOHandle(self) 528 529 def cancel(self): 530 """Cancel the pending resolver 531 532 :returns: False if was already done or cancelled; True otherwise 533 :rtype: bool 534 535 """ 536 # Try to cancel, but no guarantees 537 with self._mutex: 538 if self._state == self.ACTIVE: 539 LOGGER.debug('Canceling resolver for %s:%s', self._host, 540 self._port) 541 self._state = self.CANCELED 542 543 # Attempt to cancel, but not guaranteed 544 self._threading_timer.cancel() 545 546 self._cleanup() 547 548 return True 549 else: 550 LOGGER.debug( 551 'Ignoring _AddressResolver cancel request when not ACTIVE; ' 552 '(%s:%s); state=%s', self._host, self._port, self._state) 553 return False 554 555 def _resolve(self): 556 """Call `socket.getaddrinfo()` and return result via user's callback 557 function on the given I/O loop 558 559 """ 560 try: 561 # NOTE: on python 2.x, can't pass keyword args to getaddrinfo() 562 result = socket.getaddrinfo(self._host, self._port, self._family, 563 self._socktype, self._proto, 564 self._flags) 565 except Exception as exc: # pylint: disable=W0703 566 LOGGER.error('Address resolution failed: %r', exc) 567 result = exc 568 569 self._result = result 570 571 # Schedule result to be returned to user via user's event loop 572 with self._mutex: 573 if self._state == self.ACTIVE: 574 self._loop.add_callback(self._dispatch_result) 575 else: 576 LOGGER.debug( 577 'Asynchronous getaddrinfo cancellation detected; ' 578 'in thread; host=%r', self._host) 579 580 def _dispatch_result(self): 581 """This is called from the user's I/O loop to pass the result to the 582 user via the user's on_done callback 583 584 """ 585 if self._state == self.ACTIVE: 586 self._state = self.COMPLETED 587 try: 588 LOGGER.debug( 589 'Invoking asynchronous getaddrinfo() completion callback; ' 590 'host=%r', self._host) 591 self._on_done(self._result) 592 finally: 593 self._cleanup() 594 else: 595 LOGGER.debug( 596 'Asynchronous getaddrinfo cancellation detected; ' 597 'in I/O loop context; host=%r', self._host) 598