1"""Base implementation of event loop. 2 3The event loop can be broken up into a multiplexer (the part 4responsible for notifying us of I/O events) and the event loop proper, 5which wraps a multiplexer with functionality for scheduling callbacks, 6immediately or at a given time in the future. 7 8Whenever a public API takes a callback, subsequent positional 9arguments will be passed to the callback if/when it is called. This 10avoids the proliferation of trivial lambdas implementing closures. 11Keyword arguments for the callback are not supported; this is a 12conscious design decision, leaving the door open for keyword arguments 13to modify the meaning of the API call itself. 14""" 15 16import collections 17import collections.abc 18import concurrent.futures 19import functools 20import heapq 21import itertools 22import os 23import socket 24import stat 25import subprocess 26import threading 27import time 28import traceback 29import sys 30import warnings 31import weakref 32 33try: 34 import ssl 35except ImportError: # pragma: no cover 36 ssl = None 37 38from . import constants 39from . import coroutines 40from . import events 41from . import exceptions 42from . import futures 43from . import protocols 44from . import sslproto 45from . import staggered 46from . import tasks 47from . import transports 48from . import trsock 49from .log import logger 50 51 52__all__ = 'BaseEventLoop', 53 54 55# Minimum number of _scheduled timer handles before cleanup of 56# cancelled handles is performed. 57_MIN_SCHEDULED_TIMER_HANDLES = 100 58 59# Minimum fraction of _scheduled timer handles that are cancelled 60# before cleanup of cancelled handles is performed. 61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 62 63 64_HAS_IPv6 = hasattr(socket, 'AF_INET6') 65 66# Maximum timeout passed to select to avoid OS limitations 67MAXIMUM_SELECT_TIMEOUT = 24 * 3600 68 69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s 70# *reuse_address* parameter 71_unset = object() 72 73 74def _format_handle(handle): 75 cb = handle._callback 76 if isinstance(getattr(cb, '__self__', None), tasks.Task): 77 # format the task 78 return repr(cb.__self__) 79 else: 80 return str(handle) 81 82 83def _format_pipe(fd): 84 if fd == subprocess.PIPE: 85 return '<pipe>' 86 elif fd == subprocess.STDOUT: 87 return '<stdout>' 88 else: 89 return repr(fd) 90 91 92def _set_reuseport(sock): 93 if not hasattr(socket, 'SO_REUSEPORT'): 94 raise ValueError('reuse_port not supported by socket module') 95 else: 96 try: 97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 98 except OSError: 99 raise ValueError('reuse_port not supported by socket module, ' 100 'SO_REUSEPORT defined but not implemented.') 101 102 103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 104 # Try to skip getaddrinfo if "host" is already an IP. Users might have 105 # handled name resolution in their own code and pass in resolved IPs. 106 if not hasattr(socket, 'inet_pton'): 107 return 108 109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 110 host is None: 111 return None 112 113 if type == socket.SOCK_STREAM: 114 proto = socket.IPPROTO_TCP 115 elif type == socket.SOCK_DGRAM: 116 proto = socket.IPPROTO_UDP 117 else: 118 return None 119 120 if port is None: 121 port = 0 122 elif isinstance(port, bytes) and port == b'': 123 port = 0 124 elif isinstance(port, str) and port == '': 125 port = 0 126 else: 127 # If port's a service name like "http", don't skip getaddrinfo. 128 try: 129 port = int(port) 130 except (TypeError, ValueError): 131 return None 132 133 if family == socket.AF_UNSPEC: 134 afs = [socket.AF_INET] 135 if _HAS_IPv6: 136 afs.append(socket.AF_INET6) 137 else: 138 afs = [family] 139 140 if isinstance(host, bytes): 141 host = host.decode('idna') 142 if '%' in host: 143 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 144 # like '::1%lo0'. 145 return None 146 147 for af in afs: 148 try: 149 socket.inet_pton(af, host) 150 # The host has already been resolved. 151 if _HAS_IPv6 and af == socket.AF_INET6: 152 return af, type, proto, '', (host, port, flowinfo, scopeid) 153 else: 154 return af, type, proto, '', (host, port) 155 except OSError: 156 pass 157 158 # "host" is not an IP address. 159 return None 160 161 162def _interleave_addrinfos(addrinfos, first_address_family_count=1): 163 """Interleave list of addrinfo tuples by family.""" 164 # Group addresses by family 165 addrinfos_by_family = collections.OrderedDict() 166 for addr in addrinfos: 167 family = addr[0] 168 if family not in addrinfos_by_family: 169 addrinfos_by_family[family] = [] 170 addrinfos_by_family[family].append(addr) 171 addrinfos_lists = list(addrinfos_by_family.values()) 172 173 reordered = [] 174 if first_address_family_count > 1: 175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 176 del addrinfos_lists[0][:first_address_family_count - 1] 177 reordered.extend( 178 a for a in itertools.chain.from_iterable( 179 itertools.zip_longest(*addrinfos_lists) 180 ) if a is not None) 181 return reordered 182 183 184def _run_until_complete_cb(fut): 185 if not fut.cancelled(): 186 exc = fut.exception() 187 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 188 # Issue #22429: run_forever() already finished, no need to 189 # stop it. 190 return 191 futures._get_loop(fut).stop() 192 193 194if hasattr(socket, 'TCP_NODELAY'): 195 def _set_nodelay(sock): 196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 197 sock.type == socket.SOCK_STREAM and 198 sock.proto == socket.IPPROTO_TCP): 199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 200else: 201 def _set_nodelay(sock): 202 pass 203 204 205class _SendfileFallbackProtocol(protocols.Protocol): 206 def __init__(self, transp): 207 if not isinstance(transp, transports._FlowControlMixin): 208 raise TypeError("transport should be _FlowControlMixin instance") 209 self._transport = transp 210 self._proto = transp.get_protocol() 211 self._should_resume_reading = transp.is_reading() 212 self._should_resume_writing = transp._protocol_paused 213 transp.pause_reading() 214 transp.set_protocol(self) 215 if self._should_resume_writing: 216 self._write_ready_fut = self._transport._loop.create_future() 217 else: 218 self._write_ready_fut = None 219 220 async def drain(self): 221 if self._transport.is_closing(): 222 raise ConnectionError("Connection closed by peer") 223 fut = self._write_ready_fut 224 if fut is None: 225 return 226 await fut 227 228 def connection_made(self, transport): 229 raise RuntimeError("Invalid state: " 230 "connection should have been established already.") 231 232 def connection_lost(self, exc): 233 if self._write_ready_fut is not None: 234 # Never happens if peer disconnects after sending the whole content 235 # Thus disconnection is always an exception from user perspective 236 if exc is None: 237 self._write_ready_fut.set_exception( 238 ConnectionError("Connection is closed by peer")) 239 else: 240 self._write_ready_fut.set_exception(exc) 241 self._proto.connection_lost(exc) 242 243 def pause_writing(self): 244 if self._write_ready_fut is not None: 245 return 246 self._write_ready_fut = self._transport._loop.create_future() 247 248 def resume_writing(self): 249 if self._write_ready_fut is None: 250 return 251 self._write_ready_fut.set_result(False) 252 self._write_ready_fut = None 253 254 def data_received(self, data): 255 raise RuntimeError("Invalid state: reading should be paused") 256 257 def eof_received(self): 258 raise RuntimeError("Invalid state: reading should be paused") 259 260 async def restore(self): 261 self._transport.set_protocol(self._proto) 262 if self._should_resume_reading: 263 self._transport.resume_reading() 264 if self._write_ready_fut is not None: 265 # Cancel the future. 266 # Basically it has no effect because protocol is switched back, 267 # no code should wait for it anymore. 268 self._write_ready_fut.cancel() 269 if self._should_resume_writing: 270 self._proto.resume_writing() 271 272 273class Server(events.AbstractServer): 274 275 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 276 ssl_handshake_timeout): 277 self._loop = loop 278 self._sockets = sockets 279 self._active_count = 0 280 self._waiters = [] 281 self._protocol_factory = protocol_factory 282 self._backlog = backlog 283 self._ssl_context = ssl_context 284 self._ssl_handshake_timeout = ssl_handshake_timeout 285 self._serving = False 286 self._serving_forever_fut = None 287 288 def __repr__(self): 289 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 290 291 def _attach(self): 292 assert self._sockets is not None 293 self._active_count += 1 294 295 def _detach(self): 296 assert self._active_count > 0 297 self._active_count -= 1 298 if self._active_count == 0 and self._sockets is None: 299 self._wakeup() 300 301 def _wakeup(self): 302 waiters = self._waiters 303 self._waiters = None 304 for waiter in waiters: 305 if not waiter.done(): 306 waiter.set_result(waiter) 307 308 def _start_serving(self): 309 if self._serving: 310 return 311 self._serving = True 312 for sock in self._sockets: 313 sock.listen(self._backlog) 314 self._loop._start_serving( 315 self._protocol_factory, sock, self._ssl_context, 316 self, self._backlog, self._ssl_handshake_timeout) 317 318 def get_loop(self): 319 return self._loop 320 321 def is_serving(self): 322 return self._serving 323 324 @property 325 def sockets(self): 326 if self._sockets is None: 327 return () 328 return tuple(trsock.TransportSocket(s) for s in self._sockets) 329 330 def close(self): 331 sockets = self._sockets 332 if sockets is None: 333 return 334 self._sockets = None 335 336 for sock in sockets: 337 self._loop._stop_serving(sock) 338 339 self._serving = False 340 341 if (self._serving_forever_fut is not None and 342 not self._serving_forever_fut.done()): 343 self._serving_forever_fut.cancel() 344 self._serving_forever_fut = None 345 346 if self._active_count == 0: 347 self._wakeup() 348 349 async def start_serving(self): 350 self._start_serving() 351 # Skip one loop iteration so that all 'loop.add_reader' 352 # go through. 353 await tasks.sleep(0, loop=self._loop) 354 355 async def serve_forever(self): 356 if self._serving_forever_fut is not None: 357 raise RuntimeError( 358 f'server {self!r} is already being awaited on serve_forever()') 359 if self._sockets is None: 360 raise RuntimeError(f'server {self!r} is closed') 361 362 self._start_serving() 363 self._serving_forever_fut = self._loop.create_future() 364 365 try: 366 await self._serving_forever_fut 367 except exceptions.CancelledError: 368 try: 369 self.close() 370 await self.wait_closed() 371 finally: 372 raise 373 finally: 374 self._serving_forever_fut = None 375 376 async def wait_closed(self): 377 if self._sockets is None or self._waiters is None: 378 return 379 waiter = self._loop.create_future() 380 self._waiters.append(waiter) 381 await waiter 382 383 384class BaseEventLoop(events.AbstractEventLoop): 385 386 def __init__(self): 387 self._timer_cancelled_count = 0 388 self._closed = False 389 self._stopping = False 390 self._ready = collections.deque() 391 self._scheduled = [] 392 self._default_executor = None 393 self._internal_fds = 0 394 # Identifier of the thread running the event loop, or None if the 395 # event loop is not running 396 self._thread_id = None 397 self._clock_resolution = time.get_clock_info('monotonic').resolution 398 self._exception_handler = None 399 self.set_debug(coroutines._is_debug_mode()) 400 # In debug mode, if the execution of a callback or a step of a task 401 # exceed this duration in seconds, the slow callback/task is logged. 402 self.slow_callback_duration = 0.1 403 self._current_handle = None 404 self._task_factory = None 405 self._coroutine_origin_tracking_enabled = False 406 self._coroutine_origin_tracking_saved_depth = None 407 408 # A weak set of all asynchronous generators that are 409 # being iterated by the loop. 410 self._asyncgens = weakref.WeakSet() 411 # Set to True when `loop.shutdown_asyncgens` is called. 412 self._asyncgens_shutdown_called = False 413 414 def __repr__(self): 415 return ( 416 f'<{self.__class__.__name__} running={self.is_running()} ' 417 f'closed={self.is_closed()} debug={self.get_debug()}>' 418 ) 419 420 def create_future(self): 421 """Create a Future object attached to the loop.""" 422 return futures.Future(loop=self) 423 424 def create_task(self, coro, *, name=None): 425 """Schedule a coroutine object. 426 427 Return a task object. 428 """ 429 self._check_closed() 430 if self._task_factory is None: 431 task = tasks.Task(coro, loop=self, name=name) 432 if task._source_traceback: 433 del task._source_traceback[-1] 434 else: 435 task = self._task_factory(self, coro) 436 tasks._set_task_name(task, name) 437 438 return task 439 440 def set_task_factory(self, factory): 441 """Set a task factory that will be used by loop.create_task(). 442 443 If factory is None the default task factory will be set. 444 445 If factory is a callable, it should have a signature matching 446 '(loop, coro)', where 'loop' will be a reference to the active 447 event loop, 'coro' will be a coroutine object. The callable 448 must return a Future. 449 """ 450 if factory is not None and not callable(factory): 451 raise TypeError('task factory must be a callable or None') 452 self._task_factory = factory 453 454 def get_task_factory(self): 455 """Return a task factory, or None if the default one is in use.""" 456 return self._task_factory 457 458 def _make_socket_transport(self, sock, protocol, waiter=None, *, 459 extra=None, server=None): 460 """Create socket transport.""" 461 raise NotImplementedError 462 463 def _make_ssl_transport( 464 self, rawsock, protocol, sslcontext, waiter=None, 465 *, server_side=False, server_hostname=None, 466 extra=None, server=None, 467 ssl_handshake_timeout=None, 468 call_connection_made=True): 469 """Create SSL transport.""" 470 raise NotImplementedError 471 472 def _make_datagram_transport(self, sock, protocol, 473 address=None, waiter=None, extra=None): 474 """Create datagram transport.""" 475 raise NotImplementedError 476 477 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 478 extra=None): 479 """Create read pipe transport.""" 480 raise NotImplementedError 481 482 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 483 extra=None): 484 """Create write pipe transport.""" 485 raise NotImplementedError 486 487 async def _make_subprocess_transport(self, protocol, args, shell, 488 stdin, stdout, stderr, bufsize, 489 extra=None, **kwargs): 490 """Create subprocess transport.""" 491 raise NotImplementedError 492 493 def _write_to_self(self): 494 """Write a byte to self-pipe, to wake up the event loop. 495 496 This may be called from a different thread. 497 498 The subclass is responsible for implementing the self-pipe. 499 """ 500 raise NotImplementedError 501 502 def _process_events(self, event_list): 503 """Process selector events.""" 504 raise NotImplementedError 505 506 def _check_closed(self): 507 if self._closed: 508 raise RuntimeError('Event loop is closed') 509 510 def _asyncgen_finalizer_hook(self, agen): 511 self._asyncgens.discard(agen) 512 if not self.is_closed(): 513 self.call_soon_threadsafe(self.create_task, agen.aclose()) 514 515 def _asyncgen_firstiter_hook(self, agen): 516 if self._asyncgens_shutdown_called: 517 warnings.warn( 518 f"asynchronous generator {agen!r} was scheduled after " 519 f"loop.shutdown_asyncgens() call", 520 ResourceWarning, source=self) 521 522 self._asyncgens.add(agen) 523 524 async def shutdown_asyncgens(self): 525 """Shutdown all active asynchronous generators.""" 526 self._asyncgens_shutdown_called = True 527 528 if not len(self._asyncgens): 529 # If Python version is <3.6 or we don't have any asynchronous 530 # generators alive. 531 return 532 533 closing_agens = list(self._asyncgens) 534 self._asyncgens.clear() 535 536 results = await tasks.gather( 537 *[ag.aclose() for ag in closing_agens], 538 return_exceptions=True, 539 loop=self) 540 541 for result, agen in zip(results, closing_agens): 542 if isinstance(result, Exception): 543 self.call_exception_handler({ 544 'message': f'an error occurred during closing of ' 545 f'asynchronous generator {agen!r}', 546 'exception': result, 547 'asyncgen': agen 548 }) 549 550 def _check_running(self): 551 if self.is_running(): 552 raise RuntimeError('This event loop is already running') 553 if events._get_running_loop() is not None: 554 raise RuntimeError( 555 'Cannot run the event loop while another loop is running') 556 557 def run_forever(self): 558 """Run until stop() is called.""" 559 self._check_closed() 560 self._check_running() 561 self._set_coroutine_origin_tracking(self._debug) 562 self._thread_id = threading.get_ident() 563 564 old_agen_hooks = sys.get_asyncgen_hooks() 565 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 566 finalizer=self._asyncgen_finalizer_hook) 567 try: 568 events._set_running_loop(self) 569 while True: 570 self._run_once() 571 if self._stopping: 572 break 573 finally: 574 self._stopping = False 575 self._thread_id = None 576 events._set_running_loop(None) 577 self._set_coroutine_origin_tracking(False) 578 sys.set_asyncgen_hooks(*old_agen_hooks) 579 580 def run_until_complete(self, future): 581 """Run until the Future is done. 582 583 If the argument is a coroutine, it is wrapped in a Task. 584 585 WARNING: It would be disastrous to call run_until_complete() 586 with the same coroutine twice -- it would wrap it in two 587 different Tasks and that can't be good. 588 589 Return the Future's result, or raise its exception. 590 """ 591 self._check_closed() 592 self._check_running() 593 594 new_task = not futures.isfuture(future) 595 future = tasks.ensure_future(future, loop=self) 596 if new_task: 597 # An exception is raised if the future didn't complete, so there 598 # is no need to log the "destroy pending task" message 599 future._log_destroy_pending = False 600 601 future.add_done_callback(_run_until_complete_cb) 602 try: 603 self.run_forever() 604 except: 605 if new_task and future.done() and not future.cancelled(): 606 # The coroutine raised a BaseException. Consume the exception 607 # to not log a warning, the caller doesn't have access to the 608 # local task. 609 future.exception() 610 raise 611 finally: 612 future.remove_done_callback(_run_until_complete_cb) 613 if not future.done(): 614 raise RuntimeError('Event loop stopped before Future completed.') 615 616 return future.result() 617 618 def stop(self): 619 """Stop running the event loop. 620 621 Every callback already scheduled will still run. This simply informs 622 run_forever to stop looping after a complete iteration. 623 """ 624 self._stopping = True 625 626 def close(self): 627 """Close the event loop. 628 629 This clears the queues and shuts down the executor, 630 but does not wait for the executor to finish. 631 632 The event loop must not be running. 633 """ 634 if self.is_running(): 635 raise RuntimeError("Cannot close a running event loop") 636 if self._closed: 637 return 638 if self._debug: 639 logger.debug("Close %r", self) 640 self._closed = True 641 self._ready.clear() 642 self._scheduled.clear() 643 executor = self._default_executor 644 if executor is not None: 645 self._default_executor = None 646 executor.shutdown(wait=False) 647 648 def is_closed(self): 649 """Returns True if the event loop was closed.""" 650 return self._closed 651 652 def __del__(self, _warn=warnings.warn): 653 if not self.is_closed(): 654 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 655 if not self.is_running(): 656 self.close() 657 658 def is_running(self): 659 """Returns True if the event loop is running.""" 660 return (self._thread_id is not None) 661 662 def time(self): 663 """Return the time according to the event loop's clock. 664 665 This is a float expressed in seconds since an epoch, but the 666 epoch, precision, accuracy and drift are unspecified and may 667 differ per event loop. 668 """ 669 return time.monotonic() 670 671 def call_later(self, delay, callback, *args, context=None): 672 """Arrange for a callback to be called at a given time. 673 674 Return a Handle: an opaque object with a cancel() method that 675 can be used to cancel the call. 676 677 The delay can be an int or float, expressed in seconds. It is 678 always relative to the current time. 679 680 Each callback will be called exactly once. If two callbacks 681 are scheduled for exactly the same time, it undefined which 682 will be called first. 683 684 Any positional arguments after the callback will be passed to 685 the callback when it is called. 686 """ 687 timer = self.call_at(self.time() + delay, callback, *args, 688 context=context) 689 if timer._source_traceback: 690 del timer._source_traceback[-1] 691 return timer 692 693 def call_at(self, when, callback, *args, context=None): 694 """Like call_later(), but uses an absolute time. 695 696 Absolute time corresponds to the event loop's time() method. 697 """ 698 self._check_closed() 699 if self._debug: 700 self._check_thread() 701 self._check_callback(callback, 'call_at') 702 timer = events.TimerHandle(when, callback, args, self, context) 703 if timer._source_traceback: 704 del timer._source_traceback[-1] 705 heapq.heappush(self._scheduled, timer) 706 timer._scheduled = True 707 return timer 708 709 def call_soon(self, callback, *args, context=None): 710 """Arrange for a callback to be called as soon as possible. 711 712 This operates as a FIFO queue: callbacks are called in the 713 order in which they are registered. Each callback will be 714 called exactly once. 715 716 Any positional arguments after the callback will be passed to 717 the callback when it is called. 718 """ 719 self._check_closed() 720 if self._debug: 721 self._check_thread() 722 self._check_callback(callback, 'call_soon') 723 handle = self._call_soon(callback, args, context) 724 if handle._source_traceback: 725 del handle._source_traceback[-1] 726 return handle 727 728 def _check_callback(self, callback, method): 729 if (coroutines.iscoroutine(callback) or 730 coroutines.iscoroutinefunction(callback)): 731 raise TypeError( 732 f"coroutines cannot be used with {method}()") 733 if not callable(callback): 734 raise TypeError( 735 f'a callable object was expected by {method}(), ' 736 f'got {callback!r}') 737 738 def _call_soon(self, callback, args, context): 739 handle = events.Handle(callback, args, self, context) 740 if handle._source_traceback: 741 del handle._source_traceback[-1] 742 self._ready.append(handle) 743 return handle 744 745 def _check_thread(self): 746 """Check that the current thread is the thread running the event loop. 747 748 Non-thread-safe methods of this class make this assumption and will 749 likely behave incorrectly when the assumption is violated. 750 751 Should only be called when (self._debug == True). The caller is 752 responsible for checking this condition for performance reasons. 753 """ 754 if self._thread_id is None: 755 return 756 thread_id = threading.get_ident() 757 if thread_id != self._thread_id: 758 raise RuntimeError( 759 "Non-thread-safe operation invoked on an event loop other " 760 "than the current one") 761 762 def call_soon_threadsafe(self, callback, *args, context=None): 763 """Like call_soon(), but thread-safe.""" 764 self._check_closed() 765 if self._debug: 766 self._check_callback(callback, 'call_soon_threadsafe') 767 handle = self._call_soon(callback, args, context) 768 if handle._source_traceback: 769 del handle._source_traceback[-1] 770 self._write_to_self() 771 return handle 772 773 def run_in_executor(self, executor, func, *args): 774 self._check_closed() 775 if self._debug: 776 self._check_callback(func, 'run_in_executor') 777 if executor is None: 778 executor = self._default_executor 779 if executor is None: 780 executor = concurrent.futures.ThreadPoolExecutor() 781 self._default_executor = executor 782 return futures.wrap_future( 783 executor.submit(func, *args), loop=self) 784 785 def set_default_executor(self, executor): 786 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 787 warnings.warn( 788 'Using the default executor that is not an instance of ' 789 'ThreadPoolExecutor is deprecated and will be prohibited ' 790 'in Python 3.9', 791 DeprecationWarning, 2) 792 self._default_executor = executor 793 794 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 795 msg = [f"{host}:{port!r}"] 796 if family: 797 msg.append(f'family={family!r}') 798 if type: 799 msg.append(f'type={type!r}') 800 if proto: 801 msg.append(f'proto={proto!r}') 802 if flags: 803 msg.append(f'flags={flags!r}') 804 msg = ', '.join(msg) 805 logger.debug('Get address info %s', msg) 806 807 t0 = self.time() 808 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 809 dt = self.time() - t0 810 811 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 812 if dt >= self.slow_callback_duration: 813 logger.info(msg) 814 else: 815 logger.debug(msg) 816 return addrinfo 817 818 async def getaddrinfo(self, host, port, *, 819 family=0, type=0, proto=0, flags=0): 820 if self._debug: 821 getaddr_func = self._getaddrinfo_debug 822 else: 823 getaddr_func = socket.getaddrinfo 824 825 return await self.run_in_executor( 826 None, getaddr_func, host, port, family, type, proto, flags) 827 828 async def getnameinfo(self, sockaddr, flags=0): 829 return await self.run_in_executor( 830 None, socket.getnameinfo, sockaddr, flags) 831 832 async def sock_sendfile(self, sock, file, offset=0, count=None, 833 *, fallback=True): 834 if self._debug and sock.gettimeout() != 0: 835 raise ValueError("the socket must be non-blocking") 836 self._check_sendfile_params(sock, file, offset, count) 837 try: 838 return await self._sock_sendfile_native(sock, file, 839 offset, count) 840 except exceptions.SendfileNotAvailableError as exc: 841 if not fallback: 842 raise 843 return await self._sock_sendfile_fallback(sock, file, 844 offset, count) 845 846 async def _sock_sendfile_native(self, sock, file, offset, count): 847 # NB: sendfile syscall is not supported for SSL sockets and 848 # non-mmap files even if sendfile is supported by OS 849 raise exceptions.SendfileNotAvailableError( 850 f"syscall sendfile is not available for socket {sock!r} " 851 "and file {file!r} combination") 852 853 async def _sock_sendfile_fallback(self, sock, file, offset, count): 854 if offset: 855 file.seek(offset) 856 blocksize = ( 857 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 858 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 859 ) 860 buf = bytearray(blocksize) 861 total_sent = 0 862 try: 863 while True: 864 if count: 865 blocksize = min(count - total_sent, blocksize) 866 if blocksize <= 0: 867 break 868 view = memoryview(buf)[:blocksize] 869 read = await self.run_in_executor(None, file.readinto, view) 870 if not read: 871 break # EOF 872 await self.sock_sendall(sock, view[:read]) 873 total_sent += read 874 return total_sent 875 finally: 876 if total_sent > 0 and hasattr(file, 'seek'): 877 file.seek(offset + total_sent) 878 879 def _check_sendfile_params(self, sock, file, offset, count): 880 if 'b' not in getattr(file, 'mode', 'b'): 881 raise ValueError("file should be opened in binary mode") 882 if not sock.type == socket.SOCK_STREAM: 883 raise ValueError("only SOCK_STREAM type sockets are supported") 884 if count is not None: 885 if not isinstance(count, int): 886 raise TypeError( 887 "count must be a positive integer (got {!r})".format(count)) 888 if count <= 0: 889 raise ValueError( 890 "count must be a positive integer (got {!r})".format(count)) 891 if not isinstance(offset, int): 892 raise TypeError( 893 "offset must be a non-negative integer (got {!r})".format( 894 offset)) 895 if offset < 0: 896 raise ValueError( 897 "offset must be a non-negative integer (got {!r})".format( 898 offset)) 899 900 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 901 """Create, bind and connect one socket.""" 902 my_exceptions = [] 903 exceptions.append(my_exceptions) 904 family, type_, proto, _, address = addr_info 905 sock = None 906 try: 907 sock = socket.socket(family=family, type=type_, proto=proto) 908 sock.setblocking(False) 909 if local_addr_infos is not None: 910 for _, _, _, _, laddr in local_addr_infos: 911 try: 912 sock.bind(laddr) 913 break 914 except OSError as exc: 915 msg = ( 916 f'error while attempting to bind on ' 917 f'address {laddr!r}: ' 918 f'{exc.strerror.lower()}' 919 ) 920 exc = OSError(exc.errno, msg) 921 my_exceptions.append(exc) 922 else: # all bind attempts failed 923 raise my_exceptions.pop() 924 await self.sock_connect(sock, address) 925 return sock 926 except OSError as exc: 927 my_exceptions.append(exc) 928 if sock is not None: 929 sock.close() 930 raise 931 except: 932 if sock is not None: 933 sock.close() 934 raise 935 936 async def create_connection( 937 self, protocol_factory, host=None, port=None, 938 *, ssl=None, family=0, 939 proto=0, flags=0, sock=None, 940 local_addr=None, server_hostname=None, 941 ssl_handshake_timeout=None, 942 happy_eyeballs_delay=None, interleave=None): 943 """Connect to a TCP server. 944 945 Create a streaming transport connection to a given Internet host and 946 port: socket family AF_INET or socket.AF_INET6 depending on host (or 947 family if specified), socket type SOCK_STREAM. protocol_factory must be 948 a callable returning a protocol instance. 949 950 This method is a coroutine which will try to establish the connection 951 in the background. When successful, the coroutine returns a 952 (transport, protocol) pair. 953 """ 954 if server_hostname is not None and not ssl: 955 raise ValueError('server_hostname is only meaningful with ssl') 956 957 if server_hostname is None and ssl: 958 # Use host as default for server_hostname. It is an error 959 # if host is empty or not set, e.g. when an 960 # already-connected socket was passed or when only a port 961 # is given. To avoid this error, you can pass 962 # server_hostname='' -- this will bypass the hostname 963 # check. (This also means that if host is a numeric 964 # IP/IPv6 address, we will attempt to verify that exact 965 # address; this will probably fail, but it is possible to 966 # create a certificate for a specific IP address, so we 967 # don't judge it here.) 968 if not host: 969 raise ValueError('You must set server_hostname ' 970 'when using ssl without a host') 971 server_hostname = host 972 973 if ssl_handshake_timeout is not None and not ssl: 974 raise ValueError( 975 'ssl_handshake_timeout is only meaningful with ssl') 976 977 if happy_eyeballs_delay is not None and interleave is None: 978 # If using happy eyeballs, default to interleave addresses by family 979 interleave = 1 980 981 if host is not None or port is not None: 982 if sock is not None: 983 raise ValueError( 984 'host/port and sock can not be specified at the same time') 985 986 infos = await self._ensure_resolved( 987 (host, port), family=family, 988 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 989 if not infos: 990 raise OSError('getaddrinfo() returned empty list') 991 992 if local_addr is not None: 993 laddr_infos = await self._ensure_resolved( 994 local_addr, family=family, 995 type=socket.SOCK_STREAM, proto=proto, 996 flags=flags, loop=self) 997 if not laddr_infos: 998 raise OSError('getaddrinfo() returned empty list') 999 else: 1000 laddr_infos = None 1001 1002 if interleave: 1003 infos = _interleave_addrinfos(infos, interleave) 1004 1005 exceptions = [] 1006 if happy_eyeballs_delay is None: 1007 # not using happy eyeballs 1008 for addrinfo in infos: 1009 try: 1010 sock = await self._connect_sock( 1011 exceptions, addrinfo, laddr_infos) 1012 break 1013 except OSError: 1014 continue 1015 else: # using happy eyeballs 1016 sock, _, _ = await staggered.staggered_race( 1017 (functools.partial(self._connect_sock, 1018 exceptions, addrinfo, laddr_infos) 1019 for addrinfo in infos), 1020 happy_eyeballs_delay, loop=self) 1021 1022 if sock is None: 1023 exceptions = [exc for sub in exceptions for exc in sub] 1024 if len(exceptions) == 1: 1025 raise exceptions[0] 1026 else: 1027 # If they all have the same str(), raise one. 1028 model = str(exceptions[0]) 1029 if all(str(exc) == model for exc in exceptions): 1030 raise exceptions[0] 1031 # Raise a combined exception so the user can see all 1032 # the various error messages. 1033 raise OSError('Multiple exceptions: {}'.format( 1034 ', '.join(str(exc) for exc in exceptions))) 1035 1036 else: 1037 if sock is None: 1038 raise ValueError( 1039 'host and port was not specified and no sock specified') 1040 if sock.type != socket.SOCK_STREAM: 1041 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 1042 # are SOCK_STREAM. 1043 # We support passing AF_UNIX sockets even though we have 1044 # a dedicated API for that: create_unix_connection. 1045 # Disallowing AF_UNIX in this method, breaks backwards 1046 # compatibility. 1047 raise ValueError( 1048 f'A Stream Socket was expected, got {sock!r}') 1049 1050 transport, protocol = await self._create_connection_transport( 1051 sock, protocol_factory, ssl, server_hostname, 1052 ssl_handshake_timeout=ssl_handshake_timeout) 1053 if self._debug: 1054 # Get the socket from the transport because SSL transport closes 1055 # the old socket and creates a new SSL socket 1056 sock = transport.get_extra_info('socket') 1057 logger.debug("%r connected to %s:%r: (%r, %r)", 1058 sock, host, port, transport, protocol) 1059 return transport, protocol 1060 1061 async def _create_connection_transport( 1062 self, sock, protocol_factory, ssl, 1063 server_hostname, server_side=False, 1064 ssl_handshake_timeout=None): 1065 1066 sock.setblocking(False) 1067 1068 protocol = protocol_factory() 1069 waiter = self.create_future() 1070 if ssl: 1071 sslcontext = None if isinstance(ssl, bool) else ssl 1072 transport = self._make_ssl_transport( 1073 sock, protocol, sslcontext, waiter, 1074 server_side=server_side, server_hostname=server_hostname, 1075 ssl_handshake_timeout=ssl_handshake_timeout) 1076 else: 1077 transport = self._make_socket_transport(sock, protocol, waiter) 1078 1079 try: 1080 await waiter 1081 except: 1082 transport.close() 1083 raise 1084 1085 return transport, protocol 1086 1087 async def sendfile(self, transport, file, offset=0, count=None, 1088 *, fallback=True): 1089 """Send a file to transport. 1090 1091 Return the total number of bytes which were sent. 1092 1093 The method uses high-performance os.sendfile if available. 1094 1095 file must be a regular file object opened in binary mode. 1096 1097 offset tells from where to start reading the file. If specified, 1098 count is the total number of bytes to transmit as opposed to 1099 sending the file until EOF is reached. File position is updated on 1100 return or also in case of error in which case file.tell() 1101 can be used to figure out the number of bytes 1102 which were sent. 1103 1104 fallback set to True makes asyncio to manually read and send 1105 the file when the platform does not support the sendfile syscall 1106 (e.g. Windows or SSL socket on Unix). 1107 1108 Raise SendfileNotAvailableError if the system does not support 1109 sendfile syscall and fallback is False. 1110 """ 1111 if transport.is_closing(): 1112 raise RuntimeError("Transport is closing") 1113 mode = getattr(transport, '_sendfile_compatible', 1114 constants._SendfileMode.UNSUPPORTED) 1115 if mode is constants._SendfileMode.UNSUPPORTED: 1116 raise RuntimeError( 1117 f"sendfile is not supported for transport {transport!r}") 1118 if mode is constants._SendfileMode.TRY_NATIVE: 1119 try: 1120 return await self._sendfile_native(transport, file, 1121 offset, count) 1122 except exceptions.SendfileNotAvailableError as exc: 1123 if not fallback: 1124 raise 1125 1126 if not fallback: 1127 raise RuntimeError( 1128 f"fallback is disabled and native sendfile is not " 1129 f"supported for transport {transport!r}") 1130 1131 return await self._sendfile_fallback(transport, file, 1132 offset, count) 1133 1134 async def _sendfile_native(self, transp, file, offset, count): 1135 raise exceptions.SendfileNotAvailableError( 1136 "sendfile syscall is not supported") 1137 1138 async def _sendfile_fallback(self, transp, file, offset, count): 1139 if offset: 1140 file.seek(offset) 1141 blocksize = min(count, 16384) if count else 16384 1142 buf = bytearray(blocksize) 1143 total_sent = 0 1144 proto = _SendfileFallbackProtocol(transp) 1145 try: 1146 while True: 1147 if count: 1148 blocksize = min(count - total_sent, blocksize) 1149 if blocksize <= 0: 1150 return total_sent 1151 view = memoryview(buf)[:blocksize] 1152 read = await self.run_in_executor(None, file.readinto, view) 1153 if not read: 1154 return total_sent # EOF 1155 await proto.drain() 1156 transp.write(view[:read]) 1157 total_sent += read 1158 finally: 1159 if total_sent > 0 and hasattr(file, 'seek'): 1160 file.seek(offset + total_sent) 1161 await proto.restore() 1162 1163 async def start_tls(self, transport, protocol, sslcontext, *, 1164 server_side=False, 1165 server_hostname=None, 1166 ssl_handshake_timeout=None): 1167 """Upgrade transport to TLS. 1168 1169 Return a new transport that *protocol* should start using 1170 immediately. 1171 """ 1172 if ssl is None: 1173 raise RuntimeError('Python ssl module is not available') 1174 1175 if not isinstance(sslcontext, ssl.SSLContext): 1176 raise TypeError( 1177 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1178 f'got {sslcontext!r}') 1179 1180 if not getattr(transport, '_start_tls_compatible', False): 1181 raise TypeError( 1182 f'transport {transport!r} is not supported by start_tls()') 1183 1184 waiter = self.create_future() 1185 ssl_protocol = sslproto.SSLProtocol( 1186 self, protocol, sslcontext, waiter, 1187 server_side, server_hostname, 1188 ssl_handshake_timeout=ssl_handshake_timeout, 1189 call_connection_made=False) 1190 1191 # Pause early so that "ssl_protocol.data_received()" doesn't 1192 # have a chance to get called before "ssl_protocol.connection_made()". 1193 transport.pause_reading() 1194 1195 transport.set_protocol(ssl_protocol) 1196 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1197 resume_cb = self.call_soon(transport.resume_reading) 1198 1199 try: 1200 await waiter 1201 except BaseException: 1202 transport.close() 1203 conmade_cb.cancel() 1204 resume_cb.cancel() 1205 raise 1206 1207 return ssl_protocol._app_transport 1208 1209 async def create_datagram_endpoint(self, protocol_factory, 1210 local_addr=None, remote_addr=None, *, 1211 family=0, proto=0, flags=0, 1212 reuse_address=_unset, reuse_port=None, 1213 allow_broadcast=None, sock=None): 1214 """Create datagram connection.""" 1215 if sock is not None: 1216 if sock.type != socket.SOCK_DGRAM: 1217 raise ValueError( 1218 f'A UDP Socket was expected, got {sock!r}') 1219 if (local_addr or remote_addr or 1220 family or proto or flags or 1221 reuse_port or allow_broadcast): 1222 # show the problematic kwargs in exception msg 1223 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1224 family=family, proto=proto, flags=flags, 1225 reuse_address=reuse_address, reuse_port=reuse_port, 1226 allow_broadcast=allow_broadcast) 1227 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1228 raise ValueError( 1229 f'socket modifier keyword arguments can not be used ' 1230 f'when sock is specified. ({problems})') 1231 sock.setblocking(False) 1232 r_addr = None 1233 else: 1234 if not (local_addr or remote_addr): 1235 if family == 0: 1236 raise ValueError('unexpected address family') 1237 addr_pairs_info = (((family, proto), (None, None)),) 1238 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1239 for addr in (local_addr, remote_addr): 1240 if addr is not None and not isinstance(addr, str): 1241 raise TypeError('string is expected') 1242 1243 if local_addr and local_addr[0] not in (0, '\x00'): 1244 try: 1245 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1246 os.remove(local_addr) 1247 except FileNotFoundError: 1248 pass 1249 except OSError as err: 1250 # Directory may have permissions only to create socket. 1251 logger.error('Unable to check or remove stale UNIX ' 1252 'socket %r: %r', 1253 local_addr, err) 1254 1255 addr_pairs_info = (((family, proto), 1256 (local_addr, remote_addr)), ) 1257 else: 1258 # join address by (family, protocol) 1259 addr_infos = {} # Using order preserving dict 1260 for idx, addr in ((0, local_addr), (1, remote_addr)): 1261 if addr is not None: 1262 assert isinstance(addr, tuple) and len(addr) == 2, ( 1263 '2-tuple is expected') 1264 1265 infos = await self._ensure_resolved( 1266 addr, family=family, type=socket.SOCK_DGRAM, 1267 proto=proto, flags=flags, loop=self) 1268 if not infos: 1269 raise OSError('getaddrinfo() returned empty list') 1270 1271 for fam, _, pro, _, address in infos: 1272 key = (fam, pro) 1273 if key not in addr_infos: 1274 addr_infos[key] = [None, None] 1275 addr_infos[key][idx] = address 1276 1277 # each addr has to have info for each (family, proto) pair 1278 addr_pairs_info = [ 1279 (key, addr_pair) for key, addr_pair in addr_infos.items() 1280 if not ((local_addr and addr_pair[0] is None) or 1281 (remote_addr and addr_pair[1] is None))] 1282 1283 if not addr_pairs_info: 1284 raise ValueError('can not get address information') 1285 1286 exceptions = [] 1287 1288 # bpo-37228 1289 if reuse_address is not _unset: 1290 if reuse_address: 1291 raise ValueError("Passing `reuse_address=True` is no " 1292 "longer supported, as the usage of " 1293 "SO_REUSEPORT in UDP poses a significant " 1294 "security concern.") 1295 else: 1296 warnings.warn("The *reuse_address* parameter has been " 1297 "deprecated as of 3.5.10 and is scheduled " 1298 "for removal in 3.11.", DeprecationWarning, 1299 stacklevel=2) 1300 1301 for ((family, proto), 1302 (local_address, remote_address)) in addr_pairs_info: 1303 sock = None 1304 r_addr = None 1305 try: 1306 sock = socket.socket( 1307 family=family, type=socket.SOCK_DGRAM, proto=proto) 1308 if reuse_port: 1309 _set_reuseport(sock) 1310 if allow_broadcast: 1311 sock.setsockopt( 1312 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1313 sock.setblocking(False) 1314 1315 if local_addr: 1316 sock.bind(local_address) 1317 if remote_addr: 1318 if not allow_broadcast: 1319 await self.sock_connect(sock, remote_address) 1320 r_addr = remote_address 1321 except OSError as exc: 1322 if sock is not None: 1323 sock.close() 1324 exceptions.append(exc) 1325 except: 1326 if sock is not None: 1327 sock.close() 1328 raise 1329 else: 1330 break 1331 else: 1332 raise exceptions[0] 1333 1334 protocol = protocol_factory() 1335 waiter = self.create_future() 1336 transport = self._make_datagram_transport( 1337 sock, protocol, r_addr, waiter) 1338 if self._debug: 1339 if local_addr: 1340 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1341 "created: (%r, %r)", 1342 local_addr, remote_addr, transport, protocol) 1343 else: 1344 logger.debug("Datagram endpoint remote_addr=%r created: " 1345 "(%r, %r)", 1346 remote_addr, transport, protocol) 1347 1348 try: 1349 await waiter 1350 except: 1351 transport.close() 1352 raise 1353 1354 return transport, protocol 1355 1356 async def _ensure_resolved(self, address, *, 1357 family=0, type=socket.SOCK_STREAM, 1358 proto=0, flags=0, loop): 1359 host, port = address[:2] 1360 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 1361 if info is not None: 1362 # "host" is already a resolved IP. 1363 return [info] 1364 else: 1365 return await loop.getaddrinfo(host, port, family=family, type=type, 1366 proto=proto, flags=flags) 1367 1368 async def _create_server_getaddrinfo(self, host, port, family, flags): 1369 infos = await self._ensure_resolved((host, port), family=family, 1370 type=socket.SOCK_STREAM, 1371 flags=flags, loop=self) 1372 if not infos: 1373 raise OSError(f'getaddrinfo({host!r}) returned empty list') 1374 return infos 1375 1376 async def create_server( 1377 self, protocol_factory, host=None, port=None, 1378 *, 1379 family=socket.AF_UNSPEC, 1380 flags=socket.AI_PASSIVE, 1381 sock=None, 1382 backlog=100, 1383 ssl=None, 1384 reuse_address=None, 1385 reuse_port=None, 1386 ssl_handshake_timeout=None, 1387 start_serving=True): 1388 """Create a TCP server. 1389 1390 The host parameter can be a string, in that case the TCP server is 1391 bound to host and port. 1392 1393 The host parameter can also be a sequence of strings and in that case 1394 the TCP server is bound to all hosts of the sequence. If a host 1395 appears multiple times (possibly indirectly e.g. when hostnames 1396 resolve to the same IP address), the server is only bound once to that 1397 host. 1398 1399 Return a Server object which can be used to stop the service. 1400 1401 This method is a coroutine. 1402 """ 1403 if isinstance(ssl, bool): 1404 raise TypeError('ssl argument must be an SSLContext or None') 1405 1406 if ssl_handshake_timeout is not None and ssl is None: 1407 raise ValueError( 1408 'ssl_handshake_timeout is only meaningful with ssl') 1409 1410 if host is not None or port is not None: 1411 if sock is not None: 1412 raise ValueError( 1413 'host/port and sock can not be specified at the same time') 1414 1415 if reuse_address is None: 1416 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' 1417 sockets = [] 1418 if host == '': 1419 hosts = [None] 1420 elif (isinstance(host, str) or 1421 not isinstance(host, collections.abc.Iterable)): 1422 hosts = [host] 1423 else: 1424 hosts = host 1425 1426 fs = [self._create_server_getaddrinfo(host, port, family=family, 1427 flags=flags) 1428 for host in hosts] 1429 infos = await tasks.gather(*fs, loop=self) 1430 infos = set(itertools.chain.from_iterable(infos)) 1431 1432 completed = False 1433 try: 1434 for res in infos: 1435 af, socktype, proto, canonname, sa = res 1436 try: 1437 sock = socket.socket(af, socktype, proto) 1438 except socket.error: 1439 # Assume it's a bad family/type/protocol combination. 1440 if self._debug: 1441 logger.warning('create_server() failed to create ' 1442 'socket.socket(%r, %r, %r)', 1443 af, socktype, proto, exc_info=True) 1444 continue 1445 sockets.append(sock) 1446 if reuse_address: 1447 sock.setsockopt( 1448 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1449 if reuse_port: 1450 _set_reuseport(sock) 1451 # Disable IPv4/IPv6 dual stack support (enabled by 1452 # default on Linux) which makes a single socket 1453 # listen on both address families. 1454 if (_HAS_IPv6 and 1455 af == socket.AF_INET6 and 1456 hasattr(socket, 'IPPROTO_IPV6')): 1457 sock.setsockopt(socket.IPPROTO_IPV6, 1458 socket.IPV6_V6ONLY, 1459 True) 1460 try: 1461 sock.bind(sa) 1462 except OSError as err: 1463 raise OSError(err.errno, 'error while attempting ' 1464 'to bind on address %r: %s' 1465 % (sa, err.strerror.lower())) from None 1466 completed = True 1467 finally: 1468 if not completed: 1469 for sock in sockets: 1470 sock.close() 1471 else: 1472 if sock is None: 1473 raise ValueError('Neither host/port nor sock were specified') 1474 if sock.type != socket.SOCK_STREAM: 1475 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1476 sockets = [sock] 1477 1478 for sock in sockets: 1479 sock.setblocking(False) 1480 1481 server = Server(self, sockets, protocol_factory, 1482 ssl, backlog, ssl_handshake_timeout) 1483 if start_serving: 1484 server._start_serving() 1485 # Skip one loop iteration so that all 'loop.add_reader' 1486 # go through. 1487 await tasks.sleep(0, loop=self) 1488 1489 if self._debug: 1490 logger.info("%r is serving", server) 1491 return server 1492 1493 async def connect_accepted_socket( 1494 self, protocol_factory, sock, 1495 *, ssl=None, 1496 ssl_handshake_timeout=None): 1497 """Handle an accepted connection. 1498 1499 This is used by servers that accept connections outside of 1500 asyncio but that use asyncio to handle connections. 1501 1502 This method is a coroutine. When completed, the coroutine 1503 returns a (transport, protocol) pair. 1504 """ 1505 if sock.type != socket.SOCK_STREAM: 1506 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1507 1508 if ssl_handshake_timeout is not None and not ssl: 1509 raise ValueError( 1510 'ssl_handshake_timeout is only meaningful with ssl') 1511 1512 transport, protocol = await self._create_connection_transport( 1513 sock, protocol_factory, ssl, '', server_side=True, 1514 ssl_handshake_timeout=ssl_handshake_timeout) 1515 if self._debug: 1516 # Get the socket from the transport because SSL transport closes 1517 # the old socket and creates a new SSL socket 1518 sock = transport.get_extra_info('socket') 1519 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1520 return transport, protocol 1521 1522 async def connect_read_pipe(self, protocol_factory, pipe): 1523 protocol = protocol_factory() 1524 waiter = self.create_future() 1525 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1526 1527 try: 1528 await waiter 1529 except: 1530 transport.close() 1531 raise 1532 1533 if self._debug: 1534 logger.debug('Read pipe %r connected: (%r, %r)', 1535 pipe.fileno(), transport, protocol) 1536 return transport, protocol 1537 1538 async def connect_write_pipe(self, protocol_factory, pipe): 1539 protocol = protocol_factory() 1540 waiter = self.create_future() 1541 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1542 1543 try: 1544 await waiter 1545 except: 1546 transport.close() 1547 raise 1548 1549 if self._debug: 1550 logger.debug('Write pipe %r connected: (%r, %r)', 1551 pipe.fileno(), transport, protocol) 1552 return transport, protocol 1553 1554 def _log_subprocess(self, msg, stdin, stdout, stderr): 1555 info = [msg] 1556 if stdin is not None: 1557 info.append(f'stdin={_format_pipe(stdin)}') 1558 if stdout is not None and stderr == subprocess.STDOUT: 1559 info.append(f'stdout=stderr={_format_pipe(stdout)}') 1560 else: 1561 if stdout is not None: 1562 info.append(f'stdout={_format_pipe(stdout)}') 1563 if stderr is not None: 1564 info.append(f'stderr={_format_pipe(stderr)}') 1565 logger.debug(' '.join(info)) 1566 1567 async def subprocess_shell(self, protocol_factory, cmd, *, 1568 stdin=subprocess.PIPE, 1569 stdout=subprocess.PIPE, 1570 stderr=subprocess.PIPE, 1571 universal_newlines=False, 1572 shell=True, bufsize=0, 1573 encoding=None, errors=None, text=None, 1574 **kwargs): 1575 if not isinstance(cmd, (bytes, str)): 1576 raise ValueError("cmd must be a string") 1577 if universal_newlines: 1578 raise ValueError("universal_newlines must be False") 1579 if not shell: 1580 raise ValueError("shell must be True") 1581 if bufsize != 0: 1582 raise ValueError("bufsize must be 0") 1583 if text: 1584 raise ValueError("text must be False") 1585 if encoding is not None: 1586 raise ValueError("encoding must be None") 1587 if errors is not None: 1588 raise ValueError("errors must be None") 1589 1590 protocol = protocol_factory() 1591 debug_log = None 1592 if self._debug: 1593 # don't log parameters: they may contain sensitive information 1594 # (password) and may be too long 1595 debug_log = 'run shell command %r' % cmd 1596 self._log_subprocess(debug_log, stdin, stdout, stderr) 1597 transport = await self._make_subprocess_transport( 1598 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1599 if self._debug and debug_log is not None: 1600 logger.info('%s: %r', debug_log, transport) 1601 return transport, protocol 1602 1603 async def subprocess_exec(self, protocol_factory, program, *args, 1604 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1605 stderr=subprocess.PIPE, universal_newlines=False, 1606 shell=False, bufsize=0, 1607 encoding=None, errors=None, text=None, 1608 **kwargs): 1609 if universal_newlines: 1610 raise ValueError("universal_newlines must be False") 1611 if shell: 1612 raise ValueError("shell must be False") 1613 if bufsize != 0: 1614 raise ValueError("bufsize must be 0") 1615 if text: 1616 raise ValueError("text must be False") 1617 if encoding is not None: 1618 raise ValueError("encoding must be None") 1619 if errors is not None: 1620 raise ValueError("errors must be None") 1621 1622 popen_args = (program,) + args 1623 protocol = protocol_factory() 1624 debug_log = None 1625 if self._debug: 1626 # don't log parameters: they may contain sensitive information 1627 # (password) and may be too long 1628 debug_log = f'execute program {program!r}' 1629 self._log_subprocess(debug_log, stdin, stdout, stderr) 1630 transport = await self._make_subprocess_transport( 1631 protocol, popen_args, False, stdin, stdout, stderr, 1632 bufsize, **kwargs) 1633 if self._debug and debug_log is not None: 1634 logger.info('%s: %r', debug_log, transport) 1635 return transport, protocol 1636 1637 def get_exception_handler(self): 1638 """Return an exception handler, or None if the default one is in use. 1639 """ 1640 return self._exception_handler 1641 1642 def set_exception_handler(self, handler): 1643 """Set handler as the new event loop exception handler. 1644 1645 If handler is None, the default exception handler will 1646 be set. 1647 1648 If handler is a callable object, it should have a 1649 signature matching '(loop, context)', where 'loop' 1650 will be a reference to the active event loop, 'context' 1651 will be a dict object (see `call_exception_handler()` 1652 documentation for details about context). 1653 """ 1654 if handler is not None and not callable(handler): 1655 raise TypeError(f'A callable object or None is expected, ' 1656 f'got {handler!r}') 1657 self._exception_handler = handler 1658 1659 def default_exception_handler(self, context): 1660 """Default exception handler. 1661 1662 This is called when an exception occurs and no exception 1663 handler is set, and can be called by a custom exception 1664 handler that wants to defer to the default behavior. 1665 1666 This default handler logs the error message and other 1667 context-dependent information. In debug mode, a truncated 1668 stack trace is also appended showing where the given object 1669 (e.g. a handle or future or task) was created, if any. 1670 1671 The context parameter has the same meaning as in 1672 `call_exception_handler()`. 1673 """ 1674 message = context.get('message') 1675 if not message: 1676 message = 'Unhandled exception in event loop' 1677 1678 exception = context.get('exception') 1679 if exception is not None: 1680 exc_info = (type(exception), exception, exception.__traceback__) 1681 else: 1682 exc_info = False 1683 1684 if ('source_traceback' not in context and 1685 self._current_handle is not None and 1686 self._current_handle._source_traceback): 1687 context['handle_traceback'] = \ 1688 self._current_handle._source_traceback 1689 1690 log_lines = [message] 1691 for key in sorted(context): 1692 if key in {'message', 'exception'}: 1693 continue 1694 value = context[key] 1695 if key == 'source_traceback': 1696 tb = ''.join(traceback.format_list(value)) 1697 value = 'Object created at (most recent call last):\n' 1698 value += tb.rstrip() 1699 elif key == 'handle_traceback': 1700 tb = ''.join(traceback.format_list(value)) 1701 value = 'Handle created at (most recent call last):\n' 1702 value += tb.rstrip() 1703 else: 1704 value = repr(value) 1705 log_lines.append(f'{key}: {value}') 1706 1707 logger.error('\n'.join(log_lines), exc_info=exc_info) 1708 1709 def call_exception_handler(self, context): 1710 """Call the current event loop's exception handler. 1711 1712 The context argument is a dict containing the following keys: 1713 1714 - 'message': Error message; 1715 - 'exception' (optional): Exception object; 1716 - 'future' (optional): Future instance; 1717 - 'task' (optional): Task instance; 1718 - 'handle' (optional): Handle instance; 1719 - 'protocol' (optional): Protocol instance; 1720 - 'transport' (optional): Transport instance; 1721 - 'socket' (optional): Socket instance; 1722 - 'asyncgen' (optional): Asynchronous generator that caused 1723 the exception. 1724 1725 New keys maybe introduced in the future. 1726 1727 Note: do not overload this method in an event loop subclass. 1728 For custom exception handling, use the 1729 `set_exception_handler()` method. 1730 """ 1731 if self._exception_handler is None: 1732 try: 1733 self.default_exception_handler(context) 1734 except (SystemExit, KeyboardInterrupt): 1735 raise 1736 except BaseException: 1737 # Second protection layer for unexpected errors 1738 # in the default implementation, as well as for subclassed 1739 # event loops with overloaded "default_exception_handler". 1740 logger.error('Exception in default exception handler', 1741 exc_info=True) 1742 else: 1743 try: 1744 self._exception_handler(self, context) 1745 except (SystemExit, KeyboardInterrupt): 1746 raise 1747 except BaseException as exc: 1748 # Exception in the user set custom exception handler. 1749 try: 1750 # Let's try default handler. 1751 self.default_exception_handler({ 1752 'message': 'Unhandled error in exception handler', 1753 'exception': exc, 1754 'context': context, 1755 }) 1756 except (SystemExit, KeyboardInterrupt): 1757 raise 1758 except BaseException: 1759 # Guard 'default_exception_handler' in case it is 1760 # overloaded. 1761 logger.error('Exception in default exception handler ' 1762 'while handling an unexpected error ' 1763 'in custom exception handler', 1764 exc_info=True) 1765 1766 def _add_callback(self, handle): 1767 """Add a Handle to _scheduled (TimerHandle) or _ready.""" 1768 assert isinstance(handle, events.Handle), 'A Handle is required here' 1769 if handle._cancelled: 1770 return 1771 assert not isinstance(handle, events.TimerHandle) 1772 self._ready.append(handle) 1773 1774 def _add_callback_signalsafe(self, handle): 1775 """Like _add_callback() but called from a signal handler.""" 1776 self._add_callback(handle) 1777 self._write_to_self() 1778 1779 def _timer_handle_cancelled(self, handle): 1780 """Notification that a TimerHandle has been cancelled.""" 1781 if handle._scheduled: 1782 self._timer_cancelled_count += 1 1783 1784 def _run_once(self): 1785 """Run one full iteration of the event loop. 1786 1787 This calls all currently ready callbacks, polls for I/O, 1788 schedules the resulting callbacks, and finally schedules 1789 'call_later' callbacks. 1790 """ 1791 1792 sched_count = len(self._scheduled) 1793 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1794 self._timer_cancelled_count / sched_count > 1795 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1796 # Remove delayed calls that were cancelled if their number 1797 # is too high 1798 new_scheduled = [] 1799 for handle in self._scheduled: 1800 if handle._cancelled: 1801 handle._scheduled = False 1802 else: 1803 new_scheduled.append(handle) 1804 1805 heapq.heapify(new_scheduled) 1806 self._scheduled = new_scheduled 1807 self._timer_cancelled_count = 0 1808 else: 1809 # Remove delayed calls that were cancelled from head of queue. 1810 while self._scheduled and self._scheduled[0]._cancelled: 1811 self._timer_cancelled_count -= 1 1812 handle = heapq.heappop(self._scheduled) 1813 handle._scheduled = False 1814 1815 timeout = None 1816 if self._ready or self._stopping: 1817 timeout = 0 1818 elif self._scheduled: 1819 # Compute the desired timeout. 1820 when = self._scheduled[0]._when 1821 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 1822 1823 event_list = self._selector.select(timeout) 1824 self._process_events(event_list) 1825 1826 # Handle 'later' callbacks that are ready. 1827 end_time = self.time() + self._clock_resolution 1828 while self._scheduled: 1829 handle = self._scheduled[0] 1830 if handle._when >= end_time: 1831 break 1832 handle = heapq.heappop(self._scheduled) 1833 handle._scheduled = False 1834 self._ready.append(handle) 1835 1836 # This is the only place where callbacks are actually *called*. 1837 # All other places just add them to ready. 1838 # Note: We run all currently scheduled callbacks, but not any 1839 # callbacks scheduled by callbacks run this time around -- 1840 # they will be run the next time (after another I/O poll). 1841 # Use an idiom that is thread-safe without using locks. 1842 ntodo = len(self._ready) 1843 for i in range(ntodo): 1844 handle = self._ready.popleft() 1845 if handle._cancelled: 1846 continue 1847 if self._debug: 1848 try: 1849 self._current_handle = handle 1850 t0 = self.time() 1851 handle._run() 1852 dt = self.time() - t0 1853 if dt >= self.slow_callback_duration: 1854 logger.warning('Executing %s took %.3f seconds', 1855 _format_handle(handle), dt) 1856 finally: 1857 self._current_handle = None 1858 else: 1859 handle._run() 1860 handle = None # Needed to break cycles when an exception occurs. 1861 1862 def _set_coroutine_origin_tracking(self, enabled): 1863 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 1864 return 1865 1866 if enabled: 1867 self._coroutine_origin_tracking_saved_depth = ( 1868 sys.get_coroutine_origin_tracking_depth()) 1869 sys.set_coroutine_origin_tracking_depth( 1870 constants.DEBUG_STACK_DEPTH) 1871 else: 1872 sys.set_coroutine_origin_tracking_depth( 1873 self._coroutine_origin_tracking_saved_depth) 1874 1875 self._coroutine_origin_tracking_enabled = enabled 1876 1877 def get_debug(self): 1878 return self._debug 1879 1880 def set_debug(self, enabled): 1881 self._debug = enabled 1882 1883 if self.is_running(): 1884 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1885