1"""Base implementation of event loop. 2 3The event loop can be broken up into a multiplexer (the part 4responsible for notifying us of I/O events) and the event loop proper, 5which wraps a multiplexer with functionality for scheduling callbacks, 6immediately or at a given time in the future. 7 8Whenever a public API takes a callback, subsequent positional 9arguments will be passed to the callback if/when it is called. This 10avoids the proliferation of trivial lambdas implementing closures. 11Keyword arguments for the callback are not supported; this is a 12conscious design decision, leaving the door open for keyword arguments 13to modify the meaning of the API call itself. 14""" 15 16import collections 17import collections.abc 18import concurrent.futures 19import functools 20import heapq 21import itertools 22import os 23import socket 24import stat 25import subprocess 26import threading 27import time 28import traceback 29import sys 30import warnings 31import weakref 32 33try: 34 import ssl 35except ImportError: # pragma: no cover 36 ssl = None 37 38from . import constants 39from . import coroutines 40from . import events 41from . import exceptions 42from . import futures 43from . import protocols 44from . import sslproto 45from . import staggered 46from . import tasks 47from . import transports 48from . import trsock 49from .log import logger 50 51 52__all__ = 'BaseEventLoop', 53 54 55# Minimum number of _scheduled timer handles before cleanup of 56# cancelled handles is performed. 57_MIN_SCHEDULED_TIMER_HANDLES = 100 58 59# Minimum fraction of _scheduled timer handles that are cancelled 60# before cleanup of cancelled handles is performed. 61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 62 63 64_HAS_IPv6 = hasattr(socket, 'AF_INET6') 65 66# Maximum timeout passed to select to avoid OS limitations 67MAXIMUM_SELECT_TIMEOUT = 24 * 3600 68 69 70def _format_handle(handle): 71 cb = handle._callback 72 if isinstance(getattr(cb, '__self__', None), tasks.Task): 73 # format the task 74 return repr(cb.__self__) 75 else: 76 return str(handle) 77 78 79def _format_pipe(fd): 80 if fd == subprocess.PIPE: 81 return '<pipe>' 82 elif fd == subprocess.STDOUT: 83 return '<stdout>' 84 else: 85 return repr(fd) 86 87 88def _set_reuseport(sock): 89 if not hasattr(socket, 'SO_REUSEPORT'): 90 raise ValueError('reuse_port not supported by socket module') 91 else: 92 try: 93 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 94 except OSError: 95 raise ValueError('reuse_port not supported by socket module, ' 96 'SO_REUSEPORT defined but not implemented.') 97 98 99def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 100 # Try to skip getaddrinfo if "host" is already an IP. Users might have 101 # handled name resolution in their own code and pass in resolved IPs. 102 if not hasattr(socket, 'inet_pton'): 103 return 104 105 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 106 host is None: 107 return None 108 109 if type == socket.SOCK_STREAM: 110 proto = socket.IPPROTO_TCP 111 elif type == socket.SOCK_DGRAM: 112 proto = socket.IPPROTO_UDP 113 else: 114 return None 115 116 if port is None: 117 port = 0 118 elif isinstance(port, bytes) and port == b'': 119 port = 0 120 elif isinstance(port, str) and port == '': 121 port = 0 122 else: 123 # If port's a service name like "http", don't skip getaddrinfo. 124 try: 125 port = int(port) 126 except (TypeError, ValueError): 127 return None 128 129 if family == socket.AF_UNSPEC: 130 afs = [socket.AF_INET] 131 if _HAS_IPv6: 132 afs.append(socket.AF_INET6) 133 else: 134 afs = [family] 135 136 if isinstance(host, bytes): 137 host = host.decode('idna') 138 if '%' in host: 139 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 140 # like '::1%lo0'. 141 return None 142 143 for af in afs: 144 try: 145 socket.inet_pton(af, host) 146 # The host has already been resolved. 147 if _HAS_IPv6 and af == socket.AF_INET6: 148 return af, type, proto, '', (host, port, flowinfo, scopeid) 149 else: 150 return af, type, proto, '', (host, port) 151 except OSError: 152 pass 153 154 # "host" is not an IP address. 155 return None 156 157 158def _interleave_addrinfos(addrinfos, first_address_family_count=1): 159 """Interleave list of addrinfo tuples by family.""" 160 # Group addresses by family 161 addrinfos_by_family = collections.OrderedDict() 162 for addr in addrinfos: 163 family = addr[0] 164 if family not in addrinfos_by_family: 165 addrinfos_by_family[family] = [] 166 addrinfos_by_family[family].append(addr) 167 addrinfos_lists = list(addrinfos_by_family.values()) 168 169 reordered = [] 170 if first_address_family_count > 1: 171 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 172 del addrinfos_lists[0][:first_address_family_count - 1] 173 reordered.extend( 174 a for a in itertools.chain.from_iterable( 175 itertools.zip_longest(*addrinfos_lists) 176 ) if a is not None) 177 return reordered 178 179 180def _run_until_complete_cb(fut): 181 if not fut.cancelled(): 182 exc = fut.exception() 183 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 184 # Issue #22429: run_forever() already finished, no need to 185 # stop it. 186 return 187 futures._get_loop(fut).stop() 188 189 190if hasattr(socket, 'TCP_NODELAY'): 191 def _set_nodelay(sock): 192 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 193 sock.type == socket.SOCK_STREAM and 194 sock.proto == socket.IPPROTO_TCP): 195 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 196else: 197 def _set_nodelay(sock): 198 pass 199 200 201class _SendfileFallbackProtocol(protocols.Protocol): 202 def __init__(self, transp): 203 if not isinstance(transp, transports._FlowControlMixin): 204 raise TypeError("transport should be _FlowControlMixin instance") 205 self._transport = transp 206 self._proto = transp.get_protocol() 207 self._should_resume_reading = transp.is_reading() 208 self._should_resume_writing = transp._protocol_paused 209 transp.pause_reading() 210 transp.set_protocol(self) 211 if self._should_resume_writing: 212 self._write_ready_fut = self._transport._loop.create_future() 213 else: 214 self._write_ready_fut = None 215 216 async def drain(self): 217 if self._transport.is_closing(): 218 raise ConnectionError("Connection closed by peer") 219 fut = self._write_ready_fut 220 if fut is None: 221 return 222 await fut 223 224 def connection_made(self, transport): 225 raise RuntimeError("Invalid state: " 226 "connection should have been established already.") 227 228 def connection_lost(self, exc): 229 if self._write_ready_fut is not None: 230 # Never happens if peer disconnects after sending the whole content 231 # Thus disconnection is always an exception from user perspective 232 if exc is None: 233 self._write_ready_fut.set_exception( 234 ConnectionError("Connection is closed by peer")) 235 else: 236 self._write_ready_fut.set_exception(exc) 237 self._proto.connection_lost(exc) 238 239 def pause_writing(self): 240 if self._write_ready_fut is not None: 241 return 242 self._write_ready_fut = self._transport._loop.create_future() 243 244 def resume_writing(self): 245 if self._write_ready_fut is None: 246 return 247 self._write_ready_fut.set_result(False) 248 self._write_ready_fut = None 249 250 def data_received(self, data): 251 raise RuntimeError("Invalid state: reading should be paused") 252 253 def eof_received(self): 254 raise RuntimeError("Invalid state: reading should be paused") 255 256 async def restore(self): 257 self._transport.set_protocol(self._proto) 258 if self._should_resume_reading: 259 self._transport.resume_reading() 260 if self._write_ready_fut is not None: 261 # Cancel the future. 262 # Basically it has no effect because protocol is switched back, 263 # no code should wait for it anymore. 264 self._write_ready_fut.cancel() 265 if self._should_resume_writing: 266 self._proto.resume_writing() 267 268 269class Server(events.AbstractServer): 270 271 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 272 ssl_handshake_timeout): 273 self._loop = loop 274 self._sockets = sockets 275 self._active_count = 0 276 self._waiters = [] 277 self._protocol_factory = protocol_factory 278 self._backlog = backlog 279 self._ssl_context = ssl_context 280 self._ssl_handshake_timeout = ssl_handshake_timeout 281 self._serving = False 282 self._serving_forever_fut = None 283 284 def __repr__(self): 285 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 286 287 def _attach(self): 288 assert self._sockets is not None 289 self._active_count += 1 290 291 def _detach(self): 292 assert self._active_count > 0 293 self._active_count -= 1 294 if self._active_count == 0 and self._sockets is None: 295 self._wakeup() 296 297 def _wakeup(self): 298 waiters = self._waiters 299 self._waiters = None 300 for waiter in waiters: 301 if not waiter.done(): 302 waiter.set_result(waiter) 303 304 def _start_serving(self): 305 if self._serving: 306 return 307 self._serving = True 308 for sock in self._sockets: 309 sock.listen(self._backlog) 310 self._loop._start_serving( 311 self._protocol_factory, sock, self._ssl_context, 312 self, self._backlog, self._ssl_handshake_timeout) 313 314 def get_loop(self): 315 return self._loop 316 317 def is_serving(self): 318 return self._serving 319 320 @property 321 def sockets(self): 322 if self._sockets is None: 323 return () 324 return tuple(trsock.TransportSocket(s) for s in self._sockets) 325 326 def close(self): 327 sockets = self._sockets 328 if sockets is None: 329 return 330 self._sockets = None 331 332 for sock in sockets: 333 self._loop._stop_serving(sock) 334 335 self._serving = False 336 337 if (self._serving_forever_fut is not None and 338 not self._serving_forever_fut.done()): 339 self._serving_forever_fut.cancel() 340 self._serving_forever_fut = None 341 342 if self._active_count == 0: 343 self._wakeup() 344 345 async def start_serving(self): 346 self._start_serving() 347 # Skip one loop iteration so that all 'loop.add_reader' 348 # go through. 349 await tasks.sleep(0) 350 351 async def serve_forever(self): 352 if self._serving_forever_fut is not None: 353 raise RuntimeError( 354 f'server {self!r} is already being awaited on serve_forever()') 355 if self._sockets is None: 356 raise RuntimeError(f'server {self!r} is closed') 357 358 self._start_serving() 359 self._serving_forever_fut = self._loop.create_future() 360 361 try: 362 await self._serving_forever_fut 363 except exceptions.CancelledError: 364 try: 365 self.close() 366 await self.wait_closed() 367 finally: 368 raise 369 finally: 370 self._serving_forever_fut = None 371 372 async def wait_closed(self): 373 if self._sockets is None or self._waiters is None: 374 return 375 waiter = self._loop.create_future() 376 self._waiters.append(waiter) 377 await waiter 378 379 380class BaseEventLoop(events.AbstractEventLoop): 381 382 def __init__(self): 383 self._timer_cancelled_count = 0 384 self._closed = False 385 self._stopping = False 386 self._ready = collections.deque() 387 self._scheduled = [] 388 self._default_executor = None 389 self._internal_fds = 0 390 # Identifier of the thread running the event loop, or None if the 391 # event loop is not running 392 self._thread_id = None 393 self._clock_resolution = time.get_clock_info('monotonic').resolution 394 self._exception_handler = None 395 self.set_debug(coroutines._is_debug_mode()) 396 # In debug mode, if the execution of a callback or a step of a task 397 # exceed this duration in seconds, the slow callback/task is logged. 398 self.slow_callback_duration = 0.1 399 self._current_handle = None 400 self._task_factory = None 401 self._coroutine_origin_tracking_enabled = False 402 self._coroutine_origin_tracking_saved_depth = None 403 404 # A weak set of all asynchronous generators that are 405 # being iterated by the loop. 406 self._asyncgens = weakref.WeakSet() 407 # Set to True when `loop.shutdown_asyncgens` is called. 408 self._asyncgens_shutdown_called = False 409 # Set to True when `loop.shutdown_default_executor` is called. 410 self._executor_shutdown_called = False 411 412 def __repr__(self): 413 return ( 414 f'<{self.__class__.__name__} running={self.is_running()} ' 415 f'closed={self.is_closed()} debug={self.get_debug()}>' 416 ) 417 418 def create_future(self): 419 """Create a Future object attached to the loop.""" 420 return futures.Future(loop=self) 421 422 def create_task(self, coro, *, name=None): 423 """Schedule a coroutine object. 424 425 Return a task object. 426 """ 427 self._check_closed() 428 if self._task_factory is None: 429 task = tasks.Task(coro, loop=self, name=name) 430 if task._source_traceback: 431 del task._source_traceback[-1] 432 else: 433 task = self._task_factory(self, coro) 434 tasks._set_task_name(task, name) 435 436 return task 437 438 def set_task_factory(self, factory): 439 """Set a task factory that will be used by loop.create_task(). 440 441 If factory is None the default task factory will be set. 442 443 If factory is a callable, it should have a signature matching 444 '(loop, coro)', where 'loop' will be a reference to the active 445 event loop, 'coro' will be a coroutine object. The callable 446 must return a Future. 447 """ 448 if factory is not None and not callable(factory): 449 raise TypeError('task factory must be a callable or None') 450 self._task_factory = factory 451 452 def get_task_factory(self): 453 """Return a task factory, or None if the default one is in use.""" 454 return self._task_factory 455 456 def _make_socket_transport(self, sock, protocol, waiter=None, *, 457 extra=None, server=None): 458 """Create socket transport.""" 459 raise NotImplementedError 460 461 def _make_ssl_transport( 462 self, rawsock, protocol, sslcontext, waiter=None, 463 *, server_side=False, server_hostname=None, 464 extra=None, server=None, 465 ssl_handshake_timeout=None, 466 call_connection_made=True): 467 """Create SSL transport.""" 468 raise NotImplementedError 469 470 def _make_datagram_transport(self, sock, protocol, 471 address=None, waiter=None, extra=None): 472 """Create datagram transport.""" 473 raise NotImplementedError 474 475 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 476 extra=None): 477 """Create read pipe transport.""" 478 raise NotImplementedError 479 480 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 481 extra=None): 482 """Create write pipe transport.""" 483 raise NotImplementedError 484 485 async def _make_subprocess_transport(self, protocol, args, shell, 486 stdin, stdout, stderr, bufsize, 487 extra=None, **kwargs): 488 """Create subprocess transport.""" 489 raise NotImplementedError 490 491 def _write_to_self(self): 492 """Write a byte to self-pipe, to wake up the event loop. 493 494 This may be called from a different thread. 495 496 The subclass is responsible for implementing the self-pipe. 497 """ 498 raise NotImplementedError 499 500 def _process_events(self, event_list): 501 """Process selector events.""" 502 raise NotImplementedError 503 504 def _check_closed(self): 505 if self._closed: 506 raise RuntimeError('Event loop is closed') 507 508 def _check_default_executor(self): 509 if self._executor_shutdown_called: 510 raise RuntimeError('Executor shutdown has been called') 511 512 def _asyncgen_finalizer_hook(self, agen): 513 self._asyncgens.discard(agen) 514 if not self.is_closed(): 515 self.call_soon_threadsafe(self.create_task, agen.aclose()) 516 517 def _asyncgen_firstiter_hook(self, agen): 518 if self._asyncgens_shutdown_called: 519 warnings.warn( 520 f"asynchronous generator {agen!r} was scheduled after " 521 f"loop.shutdown_asyncgens() call", 522 ResourceWarning, source=self) 523 524 self._asyncgens.add(agen) 525 526 async def shutdown_asyncgens(self): 527 """Shutdown all active asynchronous generators.""" 528 self._asyncgens_shutdown_called = True 529 530 if not len(self._asyncgens): 531 # If Python version is <3.6 or we don't have any asynchronous 532 # generators alive. 533 return 534 535 closing_agens = list(self._asyncgens) 536 self._asyncgens.clear() 537 538 results = await tasks.gather( 539 *[ag.aclose() for ag in closing_agens], 540 return_exceptions=True) 541 542 for result, agen in zip(results, closing_agens): 543 if isinstance(result, Exception): 544 self.call_exception_handler({ 545 'message': f'an error occurred during closing of ' 546 f'asynchronous generator {agen!r}', 547 'exception': result, 548 'asyncgen': agen 549 }) 550 551 async def shutdown_default_executor(self): 552 """Schedule the shutdown of the default executor.""" 553 self._executor_shutdown_called = True 554 if self._default_executor is None: 555 return 556 future = self.create_future() 557 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 558 thread.start() 559 try: 560 await future 561 finally: 562 thread.join() 563 564 def _do_shutdown(self, future): 565 try: 566 self._default_executor.shutdown(wait=True) 567 self.call_soon_threadsafe(future.set_result, None) 568 except Exception as ex: 569 self.call_soon_threadsafe(future.set_exception, ex) 570 571 def _check_running(self): 572 if self.is_running(): 573 raise RuntimeError('This event loop is already running') 574 if events._get_running_loop() is not None: 575 raise RuntimeError( 576 'Cannot run the event loop while another loop is running') 577 578 def run_forever(self): 579 """Run until stop() is called.""" 580 self._check_closed() 581 self._check_running() 582 self._set_coroutine_origin_tracking(self._debug) 583 self._thread_id = threading.get_ident() 584 585 old_agen_hooks = sys.get_asyncgen_hooks() 586 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 587 finalizer=self._asyncgen_finalizer_hook) 588 try: 589 events._set_running_loop(self) 590 while True: 591 self._run_once() 592 if self._stopping: 593 break 594 finally: 595 self._stopping = False 596 self._thread_id = None 597 events._set_running_loop(None) 598 self._set_coroutine_origin_tracking(False) 599 sys.set_asyncgen_hooks(*old_agen_hooks) 600 601 def run_until_complete(self, future): 602 """Run until the Future is done. 603 604 If the argument is a coroutine, it is wrapped in a Task. 605 606 WARNING: It would be disastrous to call run_until_complete() 607 with the same coroutine twice -- it would wrap it in two 608 different Tasks and that can't be good. 609 610 Return the Future's result, or raise its exception. 611 """ 612 self._check_closed() 613 self._check_running() 614 615 new_task = not futures.isfuture(future) 616 future = tasks.ensure_future(future, loop=self) 617 if new_task: 618 # An exception is raised if the future didn't complete, so there 619 # is no need to log the "destroy pending task" message 620 future._log_destroy_pending = False 621 622 future.add_done_callback(_run_until_complete_cb) 623 try: 624 self.run_forever() 625 except: 626 if new_task and future.done() and not future.cancelled(): 627 # The coroutine raised a BaseException. Consume the exception 628 # to not log a warning, the caller doesn't have access to the 629 # local task. 630 future.exception() 631 raise 632 finally: 633 future.remove_done_callback(_run_until_complete_cb) 634 if not future.done(): 635 raise RuntimeError('Event loop stopped before Future completed.') 636 637 return future.result() 638 639 def stop(self): 640 """Stop running the event loop. 641 642 Every callback already scheduled will still run. This simply informs 643 run_forever to stop looping after a complete iteration. 644 """ 645 self._stopping = True 646 647 def close(self): 648 """Close the event loop. 649 650 This clears the queues and shuts down the executor, 651 but does not wait for the executor to finish. 652 653 The event loop must not be running. 654 """ 655 if self.is_running(): 656 raise RuntimeError("Cannot close a running event loop") 657 if self._closed: 658 return 659 if self._debug: 660 logger.debug("Close %r", self) 661 self._closed = True 662 self._ready.clear() 663 self._scheduled.clear() 664 self._executor_shutdown_called = True 665 executor = self._default_executor 666 if executor is not None: 667 self._default_executor = None 668 executor.shutdown(wait=False) 669 670 def is_closed(self): 671 """Returns True if the event loop was closed.""" 672 return self._closed 673 674 def __del__(self, _warn=warnings.warn): 675 if not self.is_closed(): 676 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 677 if not self.is_running(): 678 self.close() 679 680 def is_running(self): 681 """Returns True if the event loop is running.""" 682 return (self._thread_id is not None) 683 684 def time(self): 685 """Return the time according to the event loop's clock. 686 687 This is a float expressed in seconds since an epoch, but the 688 epoch, precision, accuracy and drift are unspecified and may 689 differ per event loop. 690 """ 691 return time.monotonic() 692 693 def call_later(self, delay, callback, *args, context=None): 694 """Arrange for a callback to be called at a given time. 695 696 Return a Handle: an opaque object with a cancel() method that 697 can be used to cancel the call. 698 699 The delay can be an int or float, expressed in seconds. It is 700 always relative to the current time. 701 702 Each callback will be called exactly once. If two callbacks 703 are scheduled for exactly the same time, it undefined which 704 will be called first. 705 706 Any positional arguments after the callback will be passed to 707 the callback when it is called. 708 """ 709 if delay is None: 710 raise TypeError('delay must not be None') 711 timer = self.call_at(self.time() + delay, callback, *args, 712 context=context) 713 if timer._source_traceback: 714 del timer._source_traceback[-1] 715 return timer 716 717 def call_at(self, when, callback, *args, context=None): 718 """Like call_later(), but uses an absolute time. 719 720 Absolute time corresponds to the event loop's time() method. 721 """ 722 if when is None: 723 raise TypeError("when cannot be None") 724 self._check_closed() 725 if self._debug: 726 self._check_thread() 727 self._check_callback(callback, 'call_at') 728 timer = events.TimerHandle(when, callback, args, self, context) 729 if timer._source_traceback: 730 del timer._source_traceback[-1] 731 heapq.heappush(self._scheduled, timer) 732 timer._scheduled = True 733 return timer 734 735 def call_soon(self, callback, *args, context=None): 736 """Arrange for a callback to be called as soon as possible. 737 738 This operates as a FIFO queue: callbacks are called in the 739 order in which they are registered. Each callback will be 740 called exactly once. 741 742 Any positional arguments after the callback will be passed to 743 the callback when it is called. 744 """ 745 self._check_closed() 746 if self._debug: 747 self._check_thread() 748 self._check_callback(callback, 'call_soon') 749 handle = self._call_soon(callback, args, context) 750 if handle._source_traceback: 751 del handle._source_traceback[-1] 752 return handle 753 754 def _check_callback(self, callback, method): 755 if (coroutines.iscoroutine(callback) or 756 coroutines.iscoroutinefunction(callback)): 757 raise TypeError( 758 f"coroutines cannot be used with {method}()") 759 if not callable(callback): 760 raise TypeError( 761 f'a callable object was expected by {method}(), ' 762 f'got {callback!r}') 763 764 def _call_soon(self, callback, args, context): 765 handle = events.Handle(callback, args, self, context) 766 if handle._source_traceback: 767 del handle._source_traceback[-1] 768 self._ready.append(handle) 769 return handle 770 771 def _check_thread(self): 772 """Check that the current thread is the thread running the event loop. 773 774 Non-thread-safe methods of this class make this assumption and will 775 likely behave incorrectly when the assumption is violated. 776 777 Should only be called when (self._debug == True). The caller is 778 responsible for checking this condition for performance reasons. 779 """ 780 if self._thread_id is None: 781 return 782 thread_id = threading.get_ident() 783 if thread_id != self._thread_id: 784 raise RuntimeError( 785 "Non-thread-safe operation invoked on an event loop other " 786 "than the current one") 787 788 def call_soon_threadsafe(self, callback, *args, context=None): 789 """Like call_soon(), but thread-safe.""" 790 self._check_closed() 791 if self._debug: 792 self._check_callback(callback, 'call_soon_threadsafe') 793 handle = self._call_soon(callback, args, context) 794 if handle._source_traceback: 795 del handle._source_traceback[-1] 796 self._write_to_self() 797 return handle 798 799 def run_in_executor(self, executor, func, *args): 800 self._check_closed() 801 if self._debug: 802 self._check_callback(func, 'run_in_executor') 803 if executor is None: 804 executor = self._default_executor 805 # Only check when the default executor is being used 806 self._check_default_executor() 807 if executor is None: 808 executor = concurrent.futures.ThreadPoolExecutor( 809 thread_name_prefix='asyncio' 810 ) 811 self._default_executor = executor 812 return futures.wrap_future( 813 executor.submit(func, *args), loop=self) 814 815 def set_default_executor(self, executor): 816 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 817 raise TypeError('executor must be ThreadPoolExecutor instance') 818 self._default_executor = executor 819 820 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 821 msg = [f"{host}:{port!r}"] 822 if family: 823 msg.append(f'family={family!r}') 824 if type: 825 msg.append(f'type={type!r}') 826 if proto: 827 msg.append(f'proto={proto!r}') 828 if flags: 829 msg.append(f'flags={flags!r}') 830 msg = ', '.join(msg) 831 logger.debug('Get address info %s', msg) 832 833 t0 = self.time() 834 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 835 dt = self.time() - t0 836 837 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 838 if dt >= self.slow_callback_duration: 839 logger.info(msg) 840 else: 841 logger.debug(msg) 842 return addrinfo 843 844 async def getaddrinfo(self, host, port, *, 845 family=0, type=0, proto=0, flags=0): 846 if self._debug: 847 getaddr_func = self._getaddrinfo_debug 848 else: 849 getaddr_func = socket.getaddrinfo 850 851 return await self.run_in_executor( 852 None, getaddr_func, host, port, family, type, proto, flags) 853 854 async def getnameinfo(self, sockaddr, flags=0): 855 return await self.run_in_executor( 856 None, socket.getnameinfo, sockaddr, flags) 857 858 async def sock_sendfile(self, sock, file, offset=0, count=None, 859 *, fallback=True): 860 if self._debug and sock.gettimeout() != 0: 861 raise ValueError("the socket must be non-blocking") 862 self._check_sendfile_params(sock, file, offset, count) 863 try: 864 return await self._sock_sendfile_native(sock, file, 865 offset, count) 866 except exceptions.SendfileNotAvailableError as exc: 867 if not fallback: 868 raise 869 return await self._sock_sendfile_fallback(sock, file, 870 offset, count) 871 872 async def _sock_sendfile_native(self, sock, file, offset, count): 873 # NB: sendfile syscall is not supported for SSL sockets and 874 # non-mmap files even if sendfile is supported by OS 875 raise exceptions.SendfileNotAvailableError( 876 f"syscall sendfile is not available for socket {sock!r} " 877 "and file {file!r} combination") 878 879 async def _sock_sendfile_fallback(self, sock, file, offset, count): 880 if offset: 881 file.seek(offset) 882 blocksize = ( 883 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 884 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 885 ) 886 buf = bytearray(blocksize) 887 total_sent = 0 888 try: 889 while True: 890 if count: 891 blocksize = min(count - total_sent, blocksize) 892 if blocksize <= 0: 893 break 894 view = memoryview(buf)[:blocksize] 895 read = await self.run_in_executor(None, file.readinto, view) 896 if not read: 897 break # EOF 898 await self.sock_sendall(sock, view[:read]) 899 total_sent += read 900 return total_sent 901 finally: 902 if total_sent > 0 and hasattr(file, 'seek'): 903 file.seek(offset + total_sent) 904 905 def _check_sendfile_params(self, sock, file, offset, count): 906 if 'b' not in getattr(file, 'mode', 'b'): 907 raise ValueError("file should be opened in binary mode") 908 if not sock.type == socket.SOCK_STREAM: 909 raise ValueError("only SOCK_STREAM type sockets are supported") 910 if count is not None: 911 if not isinstance(count, int): 912 raise TypeError( 913 "count must be a positive integer (got {!r})".format(count)) 914 if count <= 0: 915 raise ValueError( 916 "count must be a positive integer (got {!r})".format(count)) 917 if not isinstance(offset, int): 918 raise TypeError( 919 "offset must be a non-negative integer (got {!r})".format( 920 offset)) 921 if offset < 0: 922 raise ValueError( 923 "offset must be a non-negative integer (got {!r})".format( 924 offset)) 925 926 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 927 """Create, bind and connect one socket.""" 928 my_exceptions = [] 929 exceptions.append(my_exceptions) 930 family, type_, proto, _, address = addr_info 931 sock = None 932 try: 933 sock = socket.socket(family=family, type=type_, proto=proto) 934 sock.setblocking(False) 935 if local_addr_infos is not None: 936 for _, _, _, _, laddr in local_addr_infos: 937 try: 938 sock.bind(laddr) 939 break 940 except OSError as exc: 941 msg = ( 942 f'error while attempting to bind on ' 943 f'address {laddr!r}: ' 944 f'{exc.strerror.lower()}' 945 ) 946 exc = OSError(exc.errno, msg) 947 my_exceptions.append(exc) 948 else: # all bind attempts failed 949 raise my_exceptions.pop() 950 await self.sock_connect(sock, address) 951 return sock 952 except OSError as exc: 953 my_exceptions.append(exc) 954 if sock is not None: 955 sock.close() 956 raise 957 except: 958 if sock is not None: 959 sock.close() 960 raise 961 962 async def create_connection( 963 self, protocol_factory, host=None, port=None, 964 *, ssl=None, family=0, 965 proto=0, flags=0, sock=None, 966 local_addr=None, server_hostname=None, 967 ssl_handshake_timeout=None, 968 happy_eyeballs_delay=None, interleave=None): 969 """Connect to a TCP server. 970 971 Create a streaming transport connection to a given internet host and 972 port: socket family AF_INET or socket.AF_INET6 depending on host (or 973 family if specified), socket type SOCK_STREAM. protocol_factory must be 974 a callable returning a protocol instance. 975 976 This method is a coroutine which will try to establish the connection 977 in the background. When successful, the coroutine returns a 978 (transport, protocol) pair. 979 """ 980 if server_hostname is not None and not ssl: 981 raise ValueError('server_hostname is only meaningful with ssl') 982 983 if server_hostname is None and ssl: 984 # Use host as default for server_hostname. It is an error 985 # if host is empty or not set, e.g. when an 986 # already-connected socket was passed or when only a port 987 # is given. To avoid this error, you can pass 988 # server_hostname='' -- this will bypass the hostname 989 # check. (This also means that if host is a numeric 990 # IP/IPv6 address, we will attempt to verify that exact 991 # address; this will probably fail, but it is possible to 992 # create a certificate for a specific IP address, so we 993 # don't judge it here.) 994 if not host: 995 raise ValueError('You must set server_hostname ' 996 'when using ssl without a host') 997 server_hostname = host 998 999 if ssl_handshake_timeout is not None and not ssl: 1000 raise ValueError( 1001 'ssl_handshake_timeout is only meaningful with ssl') 1002 1003 if happy_eyeballs_delay is not None and interleave is None: 1004 # If using happy eyeballs, default to interleave addresses by family 1005 interleave = 1 1006 1007 if host is not None or port is not None: 1008 if sock is not None: 1009 raise ValueError( 1010 'host/port and sock can not be specified at the same time') 1011 1012 infos = await self._ensure_resolved( 1013 (host, port), family=family, 1014 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 1015 if not infos: 1016 raise OSError('getaddrinfo() returned empty list') 1017 1018 if local_addr is not None: 1019 laddr_infos = await self._ensure_resolved( 1020 local_addr, family=family, 1021 type=socket.SOCK_STREAM, proto=proto, 1022 flags=flags, loop=self) 1023 if not laddr_infos: 1024 raise OSError('getaddrinfo() returned empty list') 1025 else: 1026 laddr_infos = None 1027 1028 if interleave: 1029 infos = _interleave_addrinfos(infos, interleave) 1030 1031 exceptions = [] 1032 if happy_eyeballs_delay is None: 1033 # not using happy eyeballs 1034 for addrinfo in infos: 1035 try: 1036 sock = await self._connect_sock( 1037 exceptions, addrinfo, laddr_infos) 1038 break 1039 except OSError: 1040 continue 1041 else: # using happy eyeballs 1042 sock, _, _ = await staggered.staggered_race( 1043 (functools.partial(self._connect_sock, 1044 exceptions, addrinfo, laddr_infos) 1045 for addrinfo in infos), 1046 happy_eyeballs_delay, loop=self) 1047 1048 if sock is None: 1049 exceptions = [exc for sub in exceptions for exc in sub] 1050 if len(exceptions) == 1: 1051 raise exceptions[0] 1052 else: 1053 # If they all have the same str(), raise one. 1054 model = str(exceptions[0]) 1055 if all(str(exc) == model for exc in exceptions): 1056 raise exceptions[0] 1057 # Raise a combined exception so the user can see all 1058 # the various error messages. 1059 raise OSError('Multiple exceptions: {}'.format( 1060 ', '.join(str(exc) for exc in exceptions))) 1061 1062 else: 1063 if sock is None: 1064 raise ValueError( 1065 'host and port was not specified and no sock specified') 1066 if sock.type != socket.SOCK_STREAM: 1067 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 1068 # are SOCK_STREAM. 1069 # We support passing AF_UNIX sockets even though we have 1070 # a dedicated API for that: create_unix_connection. 1071 # Disallowing AF_UNIX in this method, breaks backwards 1072 # compatibility. 1073 raise ValueError( 1074 f'A Stream Socket was expected, got {sock!r}') 1075 1076 transport, protocol = await self._create_connection_transport( 1077 sock, protocol_factory, ssl, server_hostname, 1078 ssl_handshake_timeout=ssl_handshake_timeout) 1079 if self._debug: 1080 # Get the socket from the transport because SSL transport closes 1081 # the old socket and creates a new SSL socket 1082 sock = transport.get_extra_info('socket') 1083 logger.debug("%r connected to %s:%r: (%r, %r)", 1084 sock, host, port, transport, protocol) 1085 return transport, protocol 1086 1087 async def _create_connection_transport( 1088 self, sock, protocol_factory, ssl, 1089 server_hostname, server_side=False, 1090 ssl_handshake_timeout=None): 1091 1092 sock.setblocking(False) 1093 1094 protocol = protocol_factory() 1095 waiter = self.create_future() 1096 if ssl: 1097 sslcontext = None if isinstance(ssl, bool) else ssl 1098 transport = self._make_ssl_transport( 1099 sock, protocol, sslcontext, waiter, 1100 server_side=server_side, server_hostname=server_hostname, 1101 ssl_handshake_timeout=ssl_handshake_timeout) 1102 else: 1103 transport = self._make_socket_transport(sock, protocol, waiter) 1104 1105 try: 1106 await waiter 1107 except: 1108 transport.close() 1109 raise 1110 1111 return transport, protocol 1112 1113 async def sendfile(self, transport, file, offset=0, count=None, 1114 *, fallback=True): 1115 """Send a file to transport. 1116 1117 Return the total number of bytes which were sent. 1118 1119 The method uses high-performance os.sendfile if available. 1120 1121 file must be a regular file object opened in binary mode. 1122 1123 offset tells from where to start reading the file. If specified, 1124 count is the total number of bytes to transmit as opposed to 1125 sending the file until EOF is reached. File position is updated on 1126 return or also in case of error in which case file.tell() 1127 can be used to figure out the number of bytes 1128 which were sent. 1129 1130 fallback set to True makes asyncio to manually read and send 1131 the file when the platform does not support the sendfile syscall 1132 (e.g. Windows or SSL socket on Unix). 1133 1134 Raise SendfileNotAvailableError if the system does not support 1135 sendfile syscall and fallback is False. 1136 """ 1137 if transport.is_closing(): 1138 raise RuntimeError("Transport is closing") 1139 mode = getattr(transport, '_sendfile_compatible', 1140 constants._SendfileMode.UNSUPPORTED) 1141 if mode is constants._SendfileMode.UNSUPPORTED: 1142 raise RuntimeError( 1143 f"sendfile is not supported for transport {transport!r}") 1144 if mode is constants._SendfileMode.TRY_NATIVE: 1145 try: 1146 return await self._sendfile_native(transport, file, 1147 offset, count) 1148 except exceptions.SendfileNotAvailableError as exc: 1149 if not fallback: 1150 raise 1151 1152 if not fallback: 1153 raise RuntimeError( 1154 f"fallback is disabled and native sendfile is not " 1155 f"supported for transport {transport!r}") 1156 1157 return await self._sendfile_fallback(transport, file, 1158 offset, count) 1159 1160 async def _sendfile_native(self, transp, file, offset, count): 1161 raise exceptions.SendfileNotAvailableError( 1162 "sendfile syscall is not supported") 1163 1164 async def _sendfile_fallback(self, transp, file, offset, count): 1165 if offset: 1166 file.seek(offset) 1167 blocksize = min(count, 16384) if count else 16384 1168 buf = bytearray(blocksize) 1169 total_sent = 0 1170 proto = _SendfileFallbackProtocol(transp) 1171 try: 1172 while True: 1173 if count: 1174 blocksize = min(count - total_sent, blocksize) 1175 if blocksize <= 0: 1176 return total_sent 1177 view = memoryview(buf)[:blocksize] 1178 read = await self.run_in_executor(None, file.readinto, view) 1179 if not read: 1180 return total_sent # EOF 1181 await proto.drain() 1182 transp.write(view[:read]) 1183 total_sent += read 1184 finally: 1185 if total_sent > 0 and hasattr(file, 'seek'): 1186 file.seek(offset + total_sent) 1187 await proto.restore() 1188 1189 async def start_tls(self, transport, protocol, sslcontext, *, 1190 server_side=False, 1191 server_hostname=None, 1192 ssl_handshake_timeout=None): 1193 """Upgrade transport to TLS. 1194 1195 Return a new transport that *protocol* should start using 1196 immediately. 1197 """ 1198 if ssl is None: 1199 raise RuntimeError('Python ssl module is not available') 1200 1201 if not isinstance(sslcontext, ssl.SSLContext): 1202 raise TypeError( 1203 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1204 f'got {sslcontext!r}') 1205 1206 if not getattr(transport, '_start_tls_compatible', False): 1207 raise TypeError( 1208 f'transport {transport!r} is not supported by start_tls()') 1209 1210 waiter = self.create_future() 1211 ssl_protocol = sslproto.SSLProtocol( 1212 self, protocol, sslcontext, waiter, 1213 server_side, server_hostname, 1214 ssl_handshake_timeout=ssl_handshake_timeout, 1215 call_connection_made=False) 1216 1217 # Pause early so that "ssl_protocol.data_received()" doesn't 1218 # have a chance to get called before "ssl_protocol.connection_made()". 1219 transport.pause_reading() 1220 1221 transport.set_protocol(ssl_protocol) 1222 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1223 resume_cb = self.call_soon(transport.resume_reading) 1224 1225 try: 1226 await waiter 1227 except BaseException: 1228 transport.close() 1229 conmade_cb.cancel() 1230 resume_cb.cancel() 1231 raise 1232 1233 return ssl_protocol._app_transport 1234 1235 async def create_datagram_endpoint(self, protocol_factory, 1236 local_addr=None, remote_addr=None, *, 1237 family=0, proto=0, flags=0, 1238 reuse_port=None, 1239 allow_broadcast=None, sock=None): 1240 """Create datagram connection.""" 1241 if sock is not None: 1242 if sock.type != socket.SOCK_DGRAM: 1243 raise ValueError( 1244 f'A UDP Socket was expected, got {sock!r}') 1245 if (local_addr or remote_addr or 1246 family or proto or flags or 1247 reuse_port or allow_broadcast): 1248 # show the problematic kwargs in exception msg 1249 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1250 family=family, proto=proto, flags=flags, 1251 reuse_port=reuse_port, 1252 allow_broadcast=allow_broadcast) 1253 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1254 raise ValueError( 1255 f'socket modifier keyword arguments can not be used ' 1256 f'when sock is specified. ({problems})') 1257 sock.setblocking(False) 1258 r_addr = None 1259 else: 1260 if not (local_addr or remote_addr): 1261 if family == 0: 1262 raise ValueError('unexpected address family') 1263 addr_pairs_info = (((family, proto), (None, None)),) 1264 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1265 for addr in (local_addr, remote_addr): 1266 if addr is not None and not isinstance(addr, str): 1267 raise TypeError('string is expected') 1268 1269 if local_addr and local_addr[0] not in (0, '\x00'): 1270 try: 1271 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1272 os.remove(local_addr) 1273 except FileNotFoundError: 1274 pass 1275 except OSError as err: 1276 # Directory may have permissions only to create socket. 1277 logger.error('Unable to check or remove stale UNIX ' 1278 'socket %r: %r', 1279 local_addr, err) 1280 1281 addr_pairs_info = (((family, proto), 1282 (local_addr, remote_addr)), ) 1283 else: 1284 # join address by (family, protocol) 1285 addr_infos = {} # Using order preserving dict 1286 for idx, addr in ((0, local_addr), (1, remote_addr)): 1287 if addr is not None: 1288 assert isinstance(addr, tuple) and len(addr) == 2, ( 1289 '2-tuple is expected') 1290 1291 infos = await self._ensure_resolved( 1292 addr, family=family, type=socket.SOCK_DGRAM, 1293 proto=proto, flags=flags, loop=self) 1294 if not infos: 1295 raise OSError('getaddrinfo() returned empty list') 1296 1297 for fam, _, pro, _, address in infos: 1298 key = (fam, pro) 1299 if key not in addr_infos: 1300 addr_infos[key] = [None, None] 1301 addr_infos[key][idx] = address 1302 1303 # each addr has to have info for each (family, proto) pair 1304 addr_pairs_info = [ 1305 (key, addr_pair) for key, addr_pair in addr_infos.items() 1306 if not ((local_addr and addr_pair[0] is None) or 1307 (remote_addr and addr_pair[1] is None))] 1308 1309 if not addr_pairs_info: 1310 raise ValueError('can not get address information') 1311 1312 exceptions = [] 1313 1314 for ((family, proto), 1315 (local_address, remote_address)) in addr_pairs_info: 1316 sock = None 1317 r_addr = None 1318 try: 1319 sock = socket.socket( 1320 family=family, type=socket.SOCK_DGRAM, proto=proto) 1321 if reuse_port: 1322 _set_reuseport(sock) 1323 if allow_broadcast: 1324 sock.setsockopt( 1325 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1326 sock.setblocking(False) 1327 1328 if local_addr: 1329 sock.bind(local_address) 1330 if remote_addr: 1331 if not allow_broadcast: 1332 await self.sock_connect(sock, remote_address) 1333 r_addr = remote_address 1334 except OSError as exc: 1335 if sock is not None: 1336 sock.close() 1337 exceptions.append(exc) 1338 except: 1339 if sock is not None: 1340 sock.close() 1341 raise 1342 else: 1343 break 1344 else: 1345 raise exceptions[0] 1346 1347 protocol = protocol_factory() 1348 waiter = self.create_future() 1349 transport = self._make_datagram_transport( 1350 sock, protocol, r_addr, waiter) 1351 if self._debug: 1352 if local_addr: 1353 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1354 "created: (%r, %r)", 1355 local_addr, remote_addr, transport, protocol) 1356 else: 1357 logger.debug("Datagram endpoint remote_addr=%r created: " 1358 "(%r, %r)", 1359 remote_addr, transport, protocol) 1360 1361 try: 1362 await waiter 1363 except: 1364 transport.close() 1365 raise 1366 1367 return transport, protocol 1368 1369 async def _ensure_resolved(self, address, *, 1370 family=0, type=socket.SOCK_STREAM, 1371 proto=0, flags=0, loop): 1372 host, port = address[:2] 1373 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 1374 if info is not None: 1375 # "host" is already a resolved IP. 1376 return [info] 1377 else: 1378 return await loop.getaddrinfo(host, port, family=family, type=type, 1379 proto=proto, flags=flags) 1380 1381 async def _create_server_getaddrinfo(self, host, port, family, flags): 1382 infos = await self._ensure_resolved((host, port), family=family, 1383 type=socket.SOCK_STREAM, 1384 flags=flags, loop=self) 1385 if not infos: 1386 raise OSError(f'getaddrinfo({host!r}) returned empty list') 1387 return infos 1388 1389 async def create_server( 1390 self, protocol_factory, host=None, port=None, 1391 *, 1392 family=socket.AF_UNSPEC, 1393 flags=socket.AI_PASSIVE, 1394 sock=None, 1395 backlog=100, 1396 ssl=None, 1397 reuse_port=None, 1398 ssl_handshake_timeout=None, 1399 start_serving=True): 1400 """Create a TCP server. 1401 1402 The host parameter can be a string, in that case the TCP server is 1403 bound to host and port. 1404 1405 The host parameter can also be a sequence of strings and in that case 1406 the TCP server is bound to all hosts of the sequence. If a host 1407 appears multiple times (possibly indirectly e.g. when hostnames 1408 resolve to the same IP address), the server is only bound once to that 1409 host. 1410 1411 Return a Server object which can be used to stop the service. 1412 1413 This method is a coroutine. 1414 """ 1415 if isinstance(ssl, bool): 1416 raise TypeError('ssl argument must be an SSLContext or None') 1417 1418 if ssl_handshake_timeout is not None and ssl is None: 1419 raise ValueError( 1420 'ssl_handshake_timeout is only meaningful with ssl') 1421 1422 if host is not None or port is not None: 1423 if sock is not None: 1424 raise ValueError( 1425 'host/port and sock can not be specified at the same time') 1426 1427 sockets = [] 1428 if host == '': 1429 hosts = [None] 1430 elif (isinstance(host, str) or 1431 not isinstance(host, collections.abc.Iterable)): 1432 hosts = [host] 1433 else: 1434 hosts = host 1435 1436 fs = [self._create_server_getaddrinfo(host, port, family=family, 1437 flags=flags) 1438 for host in hosts] 1439 infos = await tasks.gather(*fs) 1440 infos = set(itertools.chain.from_iterable(infos)) 1441 1442 completed = False 1443 try: 1444 for res in infos: 1445 af, socktype, proto, canonname, sa = res 1446 try: 1447 sock = socket.socket(af, socktype, proto) 1448 except socket.error: 1449 # Assume it's a bad family/type/protocol combination. 1450 if self._debug: 1451 logger.warning('create_server() failed to create ' 1452 'socket.socket(%r, %r, %r)', 1453 af, socktype, proto, exc_info=True) 1454 continue 1455 sockets.append(sock) 1456 if reuse_port: 1457 _set_reuseport(sock) 1458 # Disable IPv4/IPv6 dual stack support (enabled by 1459 # default on Linux) which makes a single socket 1460 # listen on both address families. 1461 if (_HAS_IPv6 and 1462 af == socket.AF_INET6 and 1463 hasattr(socket, 'IPPROTO_IPV6')): 1464 sock.setsockopt(socket.IPPROTO_IPV6, 1465 socket.IPV6_V6ONLY, 1466 True) 1467 try: 1468 sock.bind(sa) 1469 except OSError as err: 1470 raise OSError(err.errno, 'error while attempting ' 1471 'to bind on address %r: %s' 1472 % (sa, err.strerror.lower())) from None 1473 completed = True 1474 finally: 1475 if not completed: 1476 for sock in sockets: 1477 sock.close() 1478 else: 1479 if sock is None: 1480 raise ValueError('Neither host/port nor sock were specified') 1481 if sock.type != socket.SOCK_STREAM: 1482 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1483 sockets = [sock] 1484 1485 for sock in sockets: 1486 sock.setblocking(False) 1487 1488 server = Server(self, sockets, protocol_factory, 1489 ssl, backlog, ssl_handshake_timeout) 1490 if start_serving: 1491 server._start_serving() 1492 # Skip one loop iteration so that all 'loop.add_reader' 1493 # go through. 1494 await tasks.sleep(0) 1495 1496 if self._debug: 1497 logger.info("%r is serving", server) 1498 return server 1499 1500 async def connect_accepted_socket( 1501 self, protocol_factory, sock, 1502 *, ssl=None, 1503 ssl_handshake_timeout=None): 1504 if sock.type != socket.SOCK_STREAM: 1505 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1506 1507 if ssl_handshake_timeout is not None and not ssl: 1508 raise ValueError( 1509 'ssl_handshake_timeout is only meaningful with ssl') 1510 1511 transport, protocol = await self._create_connection_transport( 1512 sock, protocol_factory, ssl, '', server_side=True, 1513 ssl_handshake_timeout=ssl_handshake_timeout) 1514 if self._debug: 1515 # Get the socket from the transport because SSL transport closes 1516 # the old socket and creates a new SSL socket 1517 sock = transport.get_extra_info('socket') 1518 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1519 return transport, protocol 1520 1521 async def connect_read_pipe(self, protocol_factory, pipe): 1522 protocol = protocol_factory() 1523 waiter = self.create_future() 1524 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1525 1526 try: 1527 await waiter 1528 except: 1529 transport.close() 1530 raise 1531 1532 if self._debug: 1533 logger.debug('Read pipe %r connected: (%r, %r)', 1534 pipe.fileno(), transport, protocol) 1535 return transport, protocol 1536 1537 async def connect_write_pipe(self, protocol_factory, pipe): 1538 protocol = protocol_factory() 1539 waiter = self.create_future() 1540 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1541 1542 try: 1543 await waiter 1544 except: 1545 transport.close() 1546 raise 1547 1548 if self._debug: 1549 logger.debug('Write pipe %r connected: (%r, %r)', 1550 pipe.fileno(), transport, protocol) 1551 return transport, protocol 1552 1553 def _log_subprocess(self, msg, stdin, stdout, stderr): 1554 info = [msg] 1555 if stdin is not None: 1556 info.append(f'stdin={_format_pipe(stdin)}') 1557 if stdout is not None and stderr == subprocess.STDOUT: 1558 info.append(f'stdout=stderr={_format_pipe(stdout)}') 1559 else: 1560 if stdout is not None: 1561 info.append(f'stdout={_format_pipe(stdout)}') 1562 if stderr is not None: 1563 info.append(f'stderr={_format_pipe(stderr)}') 1564 logger.debug(' '.join(info)) 1565 1566 async def subprocess_shell(self, protocol_factory, cmd, *, 1567 stdin=subprocess.PIPE, 1568 stdout=subprocess.PIPE, 1569 stderr=subprocess.PIPE, 1570 universal_newlines=False, 1571 shell=True, bufsize=0, 1572 encoding=None, errors=None, text=None, 1573 **kwargs): 1574 if not isinstance(cmd, (bytes, str)): 1575 raise ValueError("cmd must be a string") 1576 if universal_newlines: 1577 raise ValueError("universal_newlines must be False") 1578 if not shell: 1579 raise ValueError("shell must be True") 1580 if bufsize != 0: 1581 raise ValueError("bufsize must be 0") 1582 if text: 1583 raise ValueError("text must be False") 1584 if encoding is not None: 1585 raise ValueError("encoding must be None") 1586 if errors is not None: 1587 raise ValueError("errors must be None") 1588 1589 protocol = protocol_factory() 1590 debug_log = None 1591 if self._debug: 1592 # don't log parameters: they may contain sensitive information 1593 # (password) and may be too long 1594 debug_log = 'run shell command %r' % cmd 1595 self._log_subprocess(debug_log, stdin, stdout, stderr) 1596 transport = await self._make_subprocess_transport( 1597 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1598 if self._debug and debug_log is not None: 1599 logger.info('%s: %r', debug_log, transport) 1600 return transport, protocol 1601 1602 async def subprocess_exec(self, protocol_factory, program, *args, 1603 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1604 stderr=subprocess.PIPE, universal_newlines=False, 1605 shell=False, bufsize=0, 1606 encoding=None, errors=None, text=None, 1607 **kwargs): 1608 if universal_newlines: 1609 raise ValueError("universal_newlines must be False") 1610 if shell: 1611 raise ValueError("shell must be False") 1612 if bufsize != 0: 1613 raise ValueError("bufsize must be 0") 1614 if text: 1615 raise ValueError("text must be False") 1616 if encoding is not None: 1617 raise ValueError("encoding must be None") 1618 if errors is not None: 1619 raise ValueError("errors must be None") 1620 1621 popen_args = (program,) + args 1622 protocol = protocol_factory() 1623 debug_log = None 1624 if self._debug: 1625 # don't log parameters: they may contain sensitive information 1626 # (password) and may be too long 1627 debug_log = f'execute program {program!r}' 1628 self._log_subprocess(debug_log, stdin, stdout, stderr) 1629 transport = await self._make_subprocess_transport( 1630 protocol, popen_args, False, stdin, stdout, stderr, 1631 bufsize, **kwargs) 1632 if self._debug and debug_log is not None: 1633 logger.info('%s: %r', debug_log, transport) 1634 return transport, protocol 1635 1636 def get_exception_handler(self): 1637 """Return an exception handler, or None if the default one is in use. 1638 """ 1639 return self._exception_handler 1640 1641 def set_exception_handler(self, handler): 1642 """Set handler as the new event loop exception handler. 1643 1644 If handler is None, the default exception handler will 1645 be set. 1646 1647 If handler is a callable object, it should have a 1648 signature matching '(loop, context)', where 'loop' 1649 will be a reference to the active event loop, 'context' 1650 will be a dict object (see `call_exception_handler()` 1651 documentation for details about context). 1652 """ 1653 if handler is not None and not callable(handler): 1654 raise TypeError(f'A callable object or None is expected, ' 1655 f'got {handler!r}') 1656 self._exception_handler = handler 1657 1658 def default_exception_handler(self, context): 1659 """Default exception handler. 1660 1661 This is called when an exception occurs and no exception 1662 handler is set, and can be called by a custom exception 1663 handler that wants to defer to the default behavior. 1664 1665 This default handler logs the error message and other 1666 context-dependent information. In debug mode, a truncated 1667 stack trace is also appended showing where the given object 1668 (e.g. a handle or future or task) was created, if any. 1669 1670 The context parameter has the same meaning as in 1671 `call_exception_handler()`. 1672 """ 1673 message = context.get('message') 1674 if not message: 1675 message = 'Unhandled exception in event loop' 1676 1677 exception = context.get('exception') 1678 if exception is not None: 1679 exc_info = (type(exception), exception, exception.__traceback__) 1680 else: 1681 exc_info = False 1682 1683 if ('source_traceback' not in context and 1684 self._current_handle is not None and 1685 self._current_handle._source_traceback): 1686 context['handle_traceback'] = \ 1687 self._current_handle._source_traceback 1688 1689 log_lines = [message] 1690 for key in sorted(context): 1691 if key in {'message', 'exception'}: 1692 continue 1693 value = context[key] 1694 if key == 'source_traceback': 1695 tb = ''.join(traceback.format_list(value)) 1696 value = 'Object created at (most recent call last):\n' 1697 value += tb.rstrip() 1698 elif key == 'handle_traceback': 1699 tb = ''.join(traceback.format_list(value)) 1700 value = 'Handle created at (most recent call last):\n' 1701 value += tb.rstrip() 1702 else: 1703 value = repr(value) 1704 log_lines.append(f'{key}: {value}') 1705 1706 logger.error('\n'.join(log_lines), exc_info=exc_info) 1707 1708 def call_exception_handler(self, context): 1709 """Call the current event loop's exception handler. 1710 1711 The context argument is a dict containing the following keys: 1712 1713 - 'message': Error message; 1714 - 'exception' (optional): Exception object; 1715 - 'future' (optional): Future instance; 1716 - 'task' (optional): Task instance; 1717 - 'handle' (optional): Handle instance; 1718 - 'protocol' (optional): Protocol instance; 1719 - 'transport' (optional): Transport instance; 1720 - 'socket' (optional): Socket instance; 1721 - 'asyncgen' (optional): Asynchronous generator that caused 1722 the exception. 1723 1724 New keys maybe introduced in the future. 1725 1726 Note: do not overload this method in an event loop subclass. 1727 For custom exception handling, use the 1728 `set_exception_handler()` method. 1729 """ 1730 if self._exception_handler is None: 1731 try: 1732 self.default_exception_handler(context) 1733 except (SystemExit, KeyboardInterrupt): 1734 raise 1735 except BaseException: 1736 # Second protection layer for unexpected errors 1737 # in the default implementation, as well as for subclassed 1738 # event loops with overloaded "default_exception_handler". 1739 logger.error('Exception in default exception handler', 1740 exc_info=True) 1741 else: 1742 try: 1743 self._exception_handler(self, context) 1744 except (SystemExit, KeyboardInterrupt): 1745 raise 1746 except BaseException as exc: 1747 # Exception in the user set custom exception handler. 1748 try: 1749 # Let's try default handler. 1750 self.default_exception_handler({ 1751 'message': 'Unhandled error in exception handler', 1752 'exception': exc, 1753 'context': context, 1754 }) 1755 except (SystemExit, KeyboardInterrupt): 1756 raise 1757 except BaseException: 1758 # Guard 'default_exception_handler' in case it is 1759 # overloaded. 1760 logger.error('Exception in default exception handler ' 1761 'while handling an unexpected error ' 1762 'in custom exception handler', 1763 exc_info=True) 1764 1765 def _add_callback(self, handle): 1766 """Add a Handle to _scheduled (TimerHandle) or _ready.""" 1767 assert isinstance(handle, events.Handle), 'A Handle is required here' 1768 if handle._cancelled: 1769 return 1770 assert not isinstance(handle, events.TimerHandle) 1771 self._ready.append(handle) 1772 1773 def _add_callback_signalsafe(self, handle): 1774 """Like _add_callback() but called from a signal handler.""" 1775 self._add_callback(handle) 1776 self._write_to_self() 1777 1778 def _timer_handle_cancelled(self, handle): 1779 """Notification that a TimerHandle has been cancelled.""" 1780 if handle._scheduled: 1781 self._timer_cancelled_count += 1 1782 1783 def _run_once(self): 1784 """Run one full iteration of the event loop. 1785 1786 This calls all currently ready callbacks, polls for I/O, 1787 schedules the resulting callbacks, and finally schedules 1788 'call_later' callbacks. 1789 """ 1790 1791 sched_count = len(self._scheduled) 1792 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1793 self._timer_cancelled_count / sched_count > 1794 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1795 # Remove delayed calls that were cancelled if their number 1796 # is too high 1797 new_scheduled = [] 1798 for handle in self._scheduled: 1799 if handle._cancelled: 1800 handle._scheduled = False 1801 else: 1802 new_scheduled.append(handle) 1803 1804 heapq.heapify(new_scheduled) 1805 self._scheduled = new_scheduled 1806 self._timer_cancelled_count = 0 1807 else: 1808 # Remove delayed calls that were cancelled from head of queue. 1809 while self._scheduled and self._scheduled[0]._cancelled: 1810 self._timer_cancelled_count -= 1 1811 handle = heapq.heappop(self._scheduled) 1812 handle._scheduled = False 1813 1814 timeout = None 1815 if self._ready or self._stopping: 1816 timeout = 0 1817 elif self._scheduled: 1818 # Compute the desired timeout. 1819 when = self._scheduled[0]._when 1820 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 1821 1822 event_list = self._selector.select(timeout) 1823 self._process_events(event_list) 1824 1825 # Handle 'later' callbacks that are ready. 1826 end_time = self.time() + self._clock_resolution 1827 while self._scheduled: 1828 handle = self._scheduled[0] 1829 if handle._when >= end_time: 1830 break 1831 handle = heapq.heappop(self._scheduled) 1832 handle._scheduled = False 1833 self._ready.append(handle) 1834 1835 # This is the only place where callbacks are actually *called*. 1836 # All other places just add them to ready. 1837 # Note: We run all currently scheduled callbacks, but not any 1838 # callbacks scheduled by callbacks run this time around -- 1839 # they will be run the next time (after another I/O poll). 1840 # Use an idiom that is thread-safe without using locks. 1841 ntodo = len(self._ready) 1842 for i in range(ntodo): 1843 handle = self._ready.popleft() 1844 if handle._cancelled: 1845 continue 1846 if self._debug: 1847 try: 1848 self._current_handle = handle 1849 t0 = self.time() 1850 handle._run() 1851 dt = self.time() - t0 1852 if dt >= self.slow_callback_duration: 1853 logger.warning('Executing %s took %.3f seconds', 1854 _format_handle(handle), dt) 1855 finally: 1856 self._current_handle = None 1857 else: 1858 handle._run() 1859 handle = None # Needed to break cycles when an exception occurs. 1860 1861 def _set_coroutine_origin_tracking(self, enabled): 1862 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 1863 return 1864 1865 if enabled: 1866 self._coroutine_origin_tracking_saved_depth = ( 1867 sys.get_coroutine_origin_tracking_depth()) 1868 sys.set_coroutine_origin_tracking_depth( 1869 constants.DEBUG_STACK_DEPTH) 1870 else: 1871 sys.set_coroutine_origin_tracking_depth( 1872 self._coroutine_origin_tracking_saved_depth) 1873 1874 self._coroutine_origin_tracking_enabled = enabled 1875 1876 def get_debug(self): 1877 return self._debug 1878 1879 def set_debug(self, enabled): 1880 self._debug = enabled 1881 1882 if self.is_running(): 1883 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1884