1"""Event loop and event loop policy.""" 2 3__all__ = ( 4 'AbstractEventLoopPolicy', 5 'AbstractEventLoop', 'AbstractServer', 6 'Handle', 'TimerHandle', 7 'get_event_loop_policy', 'set_event_loop_policy', 8 'get_event_loop', 'set_event_loop', 'new_event_loop', 9 'get_child_watcher', 'set_child_watcher', 10 '_set_running_loop', 'get_running_loop', 11 '_get_running_loop', 12) 13 14import contextvars 15import os 16import socket 17import subprocess 18import sys 19import threading 20 21from . import format_helpers 22 23 24class Handle: 25 """Object returned by callback registration methods.""" 26 27 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 28 '_source_traceback', '_repr', '__weakref__', 29 '_context') 30 31 def __init__(self, callback, args, loop, context=None): 32 if context is None: 33 context = contextvars.copy_context() 34 self._context = context 35 self._loop = loop 36 self._callback = callback 37 self._args = args 38 self._cancelled = False 39 self._repr = None 40 if self._loop.get_debug(): 41 self._source_traceback = format_helpers.extract_stack( 42 sys._getframe(1)) 43 else: 44 self._source_traceback = None 45 46 def _repr_info(self): 47 info = [self.__class__.__name__] 48 if self._cancelled: 49 info.append('cancelled') 50 if self._callback is not None: 51 info.append(format_helpers._format_callback_source( 52 self._callback, self._args)) 53 if self._source_traceback: 54 frame = self._source_traceback[-1] 55 info.append(f'created at {frame[0]}:{frame[1]}') 56 return info 57 58 def __repr__(self): 59 if self._repr is not None: 60 return self._repr 61 info = self._repr_info() 62 return '<{}>'.format(' '.join(info)) 63 64 def cancel(self): 65 if not self._cancelled: 66 self._cancelled = True 67 if self._loop.get_debug(): 68 # Keep a representation in debug mode to keep callback and 69 # parameters. For example, to log the warning 70 # "Executing <Handle...> took 2.5 second" 71 self._repr = repr(self) 72 self._callback = None 73 self._args = None 74 75 def cancelled(self): 76 return self._cancelled 77 78 def _run(self): 79 try: 80 self._context.run(self._callback, *self._args) 81 except (SystemExit, KeyboardInterrupt): 82 raise 83 except BaseException as exc: 84 cb = format_helpers._format_callback_source( 85 self._callback, self._args) 86 msg = f'Exception in callback {cb}' 87 context = { 88 'message': msg, 89 'exception': exc, 90 'handle': self, 91 } 92 if self._source_traceback: 93 context['source_traceback'] = self._source_traceback 94 self._loop.call_exception_handler(context) 95 self = None # Needed to break cycles when an exception occurs. 96 97 98class TimerHandle(Handle): 99 """Object returned by timed callback registration methods.""" 100 101 __slots__ = ['_scheduled', '_when'] 102 103 def __init__(self, when, callback, args, loop, context=None): 104 super().__init__(callback, args, loop, context) 105 if self._source_traceback: 106 del self._source_traceback[-1] 107 self._when = when 108 self._scheduled = False 109 110 def _repr_info(self): 111 info = super()._repr_info() 112 pos = 2 if self._cancelled else 1 113 info.insert(pos, f'when={self._when}') 114 return info 115 116 def __hash__(self): 117 return hash(self._when) 118 119 def __lt__(self, other): 120 if isinstance(other, TimerHandle): 121 return self._when < other._when 122 return NotImplemented 123 124 def __le__(self, other): 125 if isinstance(other, TimerHandle): 126 return self._when < other._when or self.__eq__(other) 127 return NotImplemented 128 129 def __gt__(self, other): 130 if isinstance(other, TimerHandle): 131 return self._when > other._when 132 return NotImplemented 133 134 def __ge__(self, other): 135 if isinstance(other, TimerHandle): 136 return self._when > other._when or self.__eq__(other) 137 return NotImplemented 138 139 def __eq__(self, other): 140 if isinstance(other, TimerHandle): 141 return (self._when == other._when and 142 self._callback == other._callback and 143 self._args == other._args and 144 self._cancelled == other._cancelled) 145 return NotImplemented 146 147 def cancel(self): 148 if not self._cancelled: 149 self._loop._timer_handle_cancelled(self) 150 super().cancel() 151 152 def when(self): 153 """Return a scheduled callback time. 154 155 The time is an absolute timestamp, using the same time 156 reference as loop.time(). 157 """ 158 return self._when 159 160 161class AbstractServer: 162 """Abstract server returned by create_server().""" 163 164 def close(self): 165 """Stop serving. This leaves existing connections open.""" 166 raise NotImplementedError 167 168 def get_loop(self): 169 """Get the event loop the Server object is attached to.""" 170 raise NotImplementedError 171 172 def is_serving(self): 173 """Return True if the server is accepting connections.""" 174 raise NotImplementedError 175 176 async def start_serving(self): 177 """Start accepting connections. 178 179 This method is idempotent, so it can be called when 180 the server is already being serving. 181 """ 182 raise NotImplementedError 183 184 async def serve_forever(self): 185 """Start accepting connections until the coroutine is cancelled. 186 187 The server is closed when the coroutine is cancelled. 188 """ 189 raise NotImplementedError 190 191 async def wait_closed(self): 192 """Coroutine to wait until service is closed.""" 193 raise NotImplementedError 194 195 async def __aenter__(self): 196 return self 197 198 async def __aexit__(self, *exc): 199 self.close() 200 await self.wait_closed() 201 202 203class AbstractEventLoop: 204 """Abstract event loop.""" 205 206 # Running and stopping the event loop. 207 208 def run_forever(self): 209 """Run the event loop until stop() is called.""" 210 raise NotImplementedError 211 212 def run_until_complete(self, future): 213 """Run the event loop until a Future is done. 214 215 Return the Future's result, or raise its exception. 216 """ 217 raise NotImplementedError 218 219 def stop(self): 220 """Stop the event loop as soon as reasonable. 221 222 Exactly how soon that is may depend on the implementation, but 223 no more I/O callbacks should be scheduled. 224 """ 225 raise NotImplementedError 226 227 def is_running(self): 228 """Return whether the event loop is currently running.""" 229 raise NotImplementedError 230 231 def is_closed(self): 232 """Returns True if the event loop was closed.""" 233 raise NotImplementedError 234 235 def close(self): 236 """Close the loop. 237 238 The loop should not be running. 239 240 This is idempotent and irreversible. 241 242 No other methods should be called after this one. 243 """ 244 raise NotImplementedError 245 246 async def shutdown_asyncgens(self): 247 """Shutdown all active asynchronous generators.""" 248 raise NotImplementedError 249 250 async def shutdown_default_executor(self): 251 """Schedule the shutdown of the default executor.""" 252 raise NotImplementedError 253 254 # Methods scheduling callbacks. All these return Handles. 255 256 def _timer_handle_cancelled(self, handle): 257 """Notification that a TimerHandle has been cancelled.""" 258 raise NotImplementedError 259 260 def call_soon(self, callback, *args): 261 return self.call_later(0, callback, *args) 262 263 def call_later(self, delay, callback, *args): 264 raise NotImplementedError 265 266 def call_at(self, when, callback, *args): 267 raise NotImplementedError 268 269 def time(self): 270 raise NotImplementedError 271 272 def create_future(self): 273 raise NotImplementedError 274 275 # Method scheduling a coroutine object: create a task. 276 277 def create_task(self, coro, *, name=None): 278 raise NotImplementedError 279 280 # Methods for interacting with threads. 281 282 def call_soon_threadsafe(self, callback, *args): 283 raise NotImplementedError 284 285 def run_in_executor(self, executor, func, *args): 286 raise NotImplementedError 287 288 def set_default_executor(self, executor): 289 raise NotImplementedError 290 291 # Network I/O methods returning Futures. 292 293 async def getaddrinfo(self, host, port, *, 294 family=0, type=0, proto=0, flags=0): 295 raise NotImplementedError 296 297 async def getnameinfo(self, sockaddr, flags=0): 298 raise NotImplementedError 299 300 async def create_connection( 301 self, protocol_factory, host=None, port=None, 302 *, ssl=None, family=0, proto=0, 303 flags=0, sock=None, local_addr=None, 304 server_hostname=None, 305 ssl_handshake_timeout=None, 306 happy_eyeballs_delay=None, interleave=None): 307 raise NotImplementedError 308 309 async def create_server( 310 self, protocol_factory, host=None, port=None, 311 *, family=socket.AF_UNSPEC, 312 flags=socket.AI_PASSIVE, sock=None, backlog=100, 313 ssl=None, reuse_address=None, reuse_port=None, 314 ssl_handshake_timeout=None, 315 start_serving=True): 316 """A coroutine which creates a TCP server bound to host and port. 317 318 The return value is a Server object which can be used to stop 319 the service. 320 321 If host is an empty string or None all interfaces are assumed 322 and a list of multiple sockets will be returned (most likely 323 one for IPv4 and another one for IPv6). The host parameter can also be 324 a sequence (e.g. list) of hosts to bind to. 325 326 family can be set to either AF_INET or AF_INET6 to force the 327 socket to use IPv4 or IPv6. If not set it will be determined 328 from host (defaults to AF_UNSPEC). 329 330 flags is a bitmask for getaddrinfo(). 331 332 sock can optionally be specified in order to use a preexisting 333 socket object. 334 335 backlog is the maximum number of queued connections passed to 336 listen() (defaults to 100). 337 338 ssl can be set to an SSLContext to enable SSL over the 339 accepted connections. 340 341 reuse_address tells the kernel to reuse a local socket in 342 TIME_WAIT state, without waiting for its natural timeout to 343 expire. If not specified will automatically be set to True on 344 UNIX. 345 346 reuse_port tells the kernel to allow this endpoint to be bound to 347 the same port as other existing endpoints are bound to, so long as 348 they all set this flag when being created. This option is not 349 supported on Windows. 350 351 ssl_handshake_timeout is the time in seconds that an SSL server 352 will wait for completion of the SSL handshake before aborting the 353 connection. Default is 60s. 354 355 start_serving set to True (default) causes the created server 356 to start accepting connections immediately. When set to False, 357 the user should await Server.start_serving() or Server.serve_forever() 358 to make the server to start accepting connections. 359 """ 360 raise NotImplementedError 361 362 async def sendfile(self, transport, file, offset=0, count=None, 363 *, fallback=True): 364 """Send a file through a transport. 365 366 Return an amount of sent bytes. 367 """ 368 raise NotImplementedError 369 370 async def start_tls(self, transport, protocol, sslcontext, *, 371 server_side=False, 372 server_hostname=None, 373 ssl_handshake_timeout=None): 374 """Upgrade a transport to TLS. 375 376 Return a new transport that *protocol* should start using 377 immediately. 378 """ 379 raise NotImplementedError 380 381 async def create_unix_connection( 382 self, protocol_factory, path=None, *, 383 ssl=None, sock=None, 384 server_hostname=None, 385 ssl_handshake_timeout=None): 386 raise NotImplementedError 387 388 async def create_unix_server( 389 self, protocol_factory, path=None, *, 390 sock=None, backlog=100, ssl=None, 391 ssl_handshake_timeout=None, 392 start_serving=True): 393 """A coroutine which creates a UNIX Domain Socket server. 394 395 The return value is a Server object, which can be used to stop 396 the service. 397 398 path is a str, representing a file system path to bind the 399 server socket to. 400 401 sock can optionally be specified in order to use a preexisting 402 socket object. 403 404 backlog is the maximum number of queued connections passed to 405 listen() (defaults to 100). 406 407 ssl can be set to an SSLContext to enable SSL over the 408 accepted connections. 409 410 ssl_handshake_timeout is the time in seconds that an SSL server 411 will wait for the SSL handshake to complete (defaults to 60s). 412 413 start_serving set to True (default) causes the created server 414 to start accepting connections immediately. When set to False, 415 the user should await Server.start_serving() or Server.serve_forever() 416 to make the server to start accepting connections. 417 """ 418 raise NotImplementedError 419 420 async def connect_accepted_socket( 421 self, protocol_factory, sock, 422 *, ssl=None, 423 ssl_handshake_timeout=None): 424 """Handle an accepted connection. 425 426 This is used by servers that accept connections outside of 427 asyncio, but use asyncio to handle connections. 428 429 This method is a coroutine. When completed, the coroutine 430 returns a (transport, protocol) pair. 431 """ 432 raise NotImplementedError 433 434 async def create_datagram_endpoint(self, protocol_factory, 435 local_addr=None, remote_addr=None, *, 436 family=0, proto=0, flags=0, 437 reuse_address=None, reuse_port=None, 438 allow_broadcast=None, sock=None): 439 """A coroutine which creates a datagram endpoint. 440 441 This method will try to establish the endpoint in the background. 442 When successful, the coroutine returns a (transport, protocol) pair. 443 444 protocol_factory must be a callable returning a protocol instance. 445 446 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on 447 host (or family if specified), socket type SOCK_DGRAM. 448 449 reuse_address tells the kernel to reuse a local socket in 450 TIME_WAIT state, without waiting for its natural timeout to 451 expire. If not specified it will automatically be set to True on 452 UNIX. 453 454 reuse_port tells the kernel to allow this endpoint to be bound to 455 the same port as other existing endpoints are bound to, so long as 456 they all set this flag when being created. This option is not 457 supported on Windows and some UNIX's. If the 458 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 459 capability is unsupported. 460 461 allow_broadcast tells the kernel to allow this endpoint to send 462 messages to the broadcast address. 463 464 sock can optionally be specified in order to use a preexisting 465 socket object. 466 """ 467 raise NotImplementedError 468 469 # Pipes and subprocesses. 470 471 async def connect_read_pipe(self, protocol_factory, pipe): 472 """Register read pipe in event loop. Set the pipe to non-blocking mode. 473 474 protocol_factory should instantiate object with Protocol interface. 475 pipe is a file-like object. 476 Return pair (transport, protocol), where transport supports the 477 ReadTransport interface.""" 478 # The reason to accept file-like object instead of just file descriptor 479 # is: we need to own pipe and close it at transport finishing 480 # Can got complicated errors if pass f.fileno(), 481 # close fd in pipe transport then close f and vice versa. 482 raise NotImplementedError 483 484 async def connect_write_pipe(self, protocol_factory, pipe): 485 """Register write pipe in event loop. 486 487 protocol_factory should instantiate object with BaseProtocol interface. 488 Pipe is file-like object already switched to nonblocking. 489 Return pair (transport, protocol), where transport support 490 WriteTransport interface.""" 491 # The reason to accept file-like object instead of just file descriptor 492 # is: we need to own pipe and close it at transport finishing 493 # Can got complicated errors if pass f.fileno(), 494 # close fd in pipe transport then close f and vice versa. 495 raise NotImplementedError 496 497 async def subprocess_shell(self, protocol_factory, cmd, *, 498 stdin=subprocess.PIPE, 499 stdout=subprocess.PIPE, 500 stderr=subprocess.PIPE, 501 **kwargs): 502 raise NotImplementedError 503 504 async def subprocess_exec(self, protocol_factory, *args, 505 stdin=subprocess.PIPE, 506 stdout=subprocess.PIPE, 507 stderr=subprocess.PIPE, 508 **kwargs): 509 raise NotImplementedError 510 511 # Ready-based callback registration methods. 512 # The add_*() methods return None. 513 # The remove_*() methods return True if something was removed, 514 # False if there was nothing to delete. 515 516 def add_reader(self, fd, callback, *args): 517 raise NotImplementedError 518 519 def remove_reader(self, fd): 520 raise NotImplementedError 521 522 def add_writer(self, fd, callback, *args): 523 raise NotImplementedError 524 525 def remove_writer(self, fd): 526 raise NotImplementedError 527 528 # Completion based I/O methods returning Futures. 529 530 async def sock_recv(self, sock, nbytes): 531 raise NotImplementedError 532 533 async def sock_recv_into(self, sock, buf): 534 raise NotImplementedError 535 536 async def sock_sendall(self, sock, data): 537 raise NotImplementedError 538 539 async def sock_connect(self, sock, address): 540 raise NotImplementedError 541 542 async def sock_accept(self, sock): 543 raise NotImplementedError 544 545 async def sock_sendfile(self, sock, file, offset=0, count=None, 546 *, fallback=None): 547 raise NotImplementedError 548 549 # Signal handling. 550 551 def add_signal_handler(self, sig, callback, *args): 552 raise NotImplementedError 553 554 def remove_signal_handler(self, sig): 555 raise NotImplementedError 556 557 # Task factory. 558 559 def set_task_factory(self, factory): 560 raise NotImplementedError 561 562 def get_task_factory(self): 563 raise NotImplementedError 564 565 # Error handlers. 566 567 def get_exception_handler(self): 568 raise NotImplementedError 569 570 def set_exception_handler(self, handler): 571 raise NotImplementedError 572 573 def default_exception_handler(self, context): 574 raise NotImplementedError 575 576 def call_exception_handler(self, context): 577 raise NotImplementedError 578 579 # Debug flag management. 580 581 def get_debug(self): 582 raise NotImplementedError 583 584 def set_debug(self, enabled): 585 raise NotImplementedError 586 587 588class AbstractEventLoopPolicy: 589 """Abstract policy for accessing the event loop.""" 590 591 def get_event_loop(self): 592 """Get the event loop for the current context. 593 594 Returns an event loop object implementing the BaseEventLoop interface, 595 or raises an exception in case no event loop has been set for the 596 current context and the current policy does not specify to create one. 597 598 It should never return None.""" 599 raise NotImplementedError 600 601 def set_event_loop(self, loop): 602 """Set the event loop for the current context to loop.""" 603 raise NotImplementedError 604 605 def new_event_loop(self): 606 """Create and return a new event loop object according to this 607 policy's rules. If there's need to set this loop as the event loop for 608 the current context, set_event_loop must be called explicitly.""" 609 raise NotImplementedError 610 611 # Child processes handling (Unix only). 612 613 def get_child_watcher(self): 614 "Get the watcher for child processes." 615 raise NotImplementedError 616 617 def set_child_watcher(self, watcher): 618 """Set the watcher for child processes.""" 619 raise NotImplementedError 620 621 622class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): 623 """Default policy implementation for accessing the event loop. 624 625 In this policy, each thread has its own event loop. However, we 626 only automatically create an event loop by default for the main 627 thread; other threads by default have no event loop. 628 629 Other policies may have different rules (e.g. a single global 630 event loop, or automatically creating an event loop per thread, or 631 using some other notion of context to which an event loop is 632 associated). 633 """ 634 635 _loop_factory = None 636 637 class _Local(threading.local): 638 _loop = None 639 _set_called = False 640 641 def __init__(self): 642 self._local = self._Local() 643 644 def get_event_loop(self): 645 """Get the event loop for the current context. 646 647 Returns an instance of EventLoop or raises an exception. 648 """ 649 if (self._local._loop is None and 650 not self._local._set_called and 651 threading.current_thread() is threading.main_thread()): 652 self.set_event_loop(self.new_event_loop()) 653 654 if self._local._loop is None: 655 raise RuntimeError('There is no current event loop in thread %r.' 656 % threading.current_thread().name) 657 658 return self._local._loop 659 660 def set_event_loop(self, loop): 661 """Set the event loop.""" 662 self._local._set_called = True 663 if loop is not None and not isinstance(loop, AbstractEventLoop): 664 raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'") 665 self._local._loop = loop 666 667 def new_event_loop(self): 668 """Create a new event loop. 669 670 You must call set_event_loop() to make this the current event 671 loop. 672 """ 673 return self._loop_factory() 674 675 676# Event loop policy. The policy itself is always global, even if the 677# policy's rules say that there is an event loop per thread (or other 678# notion of context). The default policy is installed by the first 679# call to get_event_loop_policy(). 680_event_loop_policy = None 681 682# Lock for protecting the on-the-fly creation of the event loop policy. 683_lock = threading.Lock() 684 685 686# A TLS for the running event loop, used by _get_running_loop. 687class _RunningLoop(threading.local): 688 loop_pid = (None, None) 689 690 691_running_loop = _RunningLoop() 692 693 694def get_running_loop(): 695 """Return the running event loop. Raise a RuntimeError if there is none. 696 697 This function is thread-specific. 698 """ 699 # NOTE: this function is implemented in C (see _asynciomodule.c) 700 loop = _get_running_loop() 701 if loop is None: 702 raise RuntimeError('no running event loop') 703 return loop 704 705 706def _get_running_loop(): 707 """Return the running event loop or None. 708 709 This is a low-level function intended to be used by event loops. 710 This function is thread-specific. 711 """ 712 # NOTE: this function is implemented in C (see _asynciomodule.c) 713 running_loop, pid = _running_loop.loop_pid 714 if running_loop is not None and pid == os.getpid(): 715 return running_loop 716 717 718def _set_running_loop(loop): 719 """Set the running event loop. 720 721 This is a low-level function intended to be used by event loops. 722 This function is thread-specific. 723 """ 724 # NOTE: this function is implemented in C (see _asynciomodule.c) 725 _running_loop.loop_pid = (loop, os.getpid()) 726 727 728def _init_event_loop_policy(): 729 global _event_loop_policy 730 with _lock: 731 if _event_loop_policy is None: # pragma: no branch 732 from . import DefaultEventLoopPolicy 733 _event_loop_policy = DefaultEventLoopPolicy() 734 735 736def get_event_loop_policy(): 737 """Get the current event loop policy.""" 738 if _event_loop_policy is None: 739 _init_event_loop_policy() 740 return _event_loop_policy 741 742 743def set_event_loop_policy(policy): 744 """Set the current event loop policy. 745 746 If policy is None, the default policy is restored.""" 747 global _event_loop_policy 748 if policy is not None and not isinstance(policy, AbstractEventLoopPolicy): 749 raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'") 750 _event_loop_policy = policy 751 752 753def get_event_loop(): 754 """Return an asyncio event loop. 755 756 When called from a coroutine or a callback (e.g. scheduled with call_soon 757 or similar API), this function will always return the running event loop. 758 759 If there is no running event loop set, the function will return 760 the result of `get_event_loop_policy().get_event_loop()` call. 761 """ 762 # NOTE: this function is implemented in C (see _asynciomodule.c) 763 return _py__get_event_loop() 764 765 766def _get_event_loop(stacklevel=3): 767 current_loop = _get_running_loop() 768 if current_loop is not None: 769 return current_loop 770 import warnings 771 warnings.warn('There is no current event loop', 772 DeprecationWarning, stacklevel=stacklevel) 773 return get_event_loop_policy().get_event_loop() 774 775 776def set_event_loop(loop): 777 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 778 get_event_loop_policy().set_event_loop(loop) 779 780 781def new_event_loop(): 782 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 783 return get_event_loop_policy().new_event_loop() 784 785 786def get_child_watcher(): 787 """Equivalent to calling get_event_loop_policy().get_child_watcher().""" 788 return get_event_loop_policy().get_child_watcher() 789 790 791def set_child_watcher(watcher): 792 """Equivalent to calling 793 get_event_loop_policy().set_child_watcher(watcher).""" 794 return get_event_loop_policy().set_child_watcher(watcher) 795 796 797# Alias pure-Python implementations for testing purposes. 798_py__get_running_loop = _get_running_loop 799_py__set_running_loop = _set_running_loop 800_py_get_running_loop = get_running_loop 801_py_get_event_loop = get_event_loop 802_py__get_event_loop = _get_event_loop 803 804 805try: 806 # get_event_loop() is one of the most frequently called 807 # functions in asyncio. Pure Python implementation is 808 # about 4 times slower than C-accelerated. 809 from _asyncio import (_get_running_loop, _set_running_loop, 810 get_running_loop, get_event_loop, _get_event_loop) 811except ImportError: 812 pass 813else: 814 # Alias C implementations for testing purposes. 815 _c__get_running_loop = _get_running_loop 816 _c__set_running_loop = _set_running_loop 817 _c_get_running_loop = get_running_loop 818 _c_get_event_loop = get_event_loop 819 _c__get_event_loop = _get_event_loop 820