1"""Event loop and event loop policy.""" 2from __future__ import absolute_import 3 4__all__ = ['AbstractEventLoopPolicy', 5 'AbstractEventLoop', 'AbstractServer', 6 'Handle', 'TimerHandle', 7 'get_event_loop_policy', 'set_event_loop_policy', 8 'get_event_loop', 'set_event_loop', 'new_event_loop', 9 'get_child_watcher', 'set_child_watcher', 10 ] 11 12import functools 13import inspect 14import socket 15import subprocess 16import sys 17import threading 18import traceback 19try: 20 import reprlib # Python 3 21except ImportError: 22 import repr as reprlib # Python 2 23 24try: 25 import asyncio 26except (ImportError, SyntaxError): 27 # ignore SyntaxError for convenience: ignore SyntaxError caused by "yield 28 # from" if asyncio module is in the Python path 29 asyncio = None 30 31from trollius import compat 32 33 34def _get_function_source(func): 35 if compat.PY34: 36 func = inspect.unwrap(func) 37 elif hasattr(func, '__wrapped__'): 38 func = func.__wrapped__ 39 if inspect.isfunction(func): 40 code = func.__code__ 41 return (code.co_filename, code.co_firstlineno) 42 if isinstance(func, functools.partial): 43 return _get_function_source(func.func) 44 if compat.PY34 and isinstance(func, functools.partialmethod): 45 return _get_function_source(func.func) 46 return None 47 48 49def _format_args(args): 50 """Format function arguments. 51 52 Special case for a single parameter: ('hello',) is formatted as ('hello'). 53 """ 54 # use reprlib to limit the length of the output 55 args_repr = reprlib.repr(args) 56 if len(args) == 1 and args_repr.endswith(',)'): 57 args_repr = args_repr[:-2] + ')' 58 return args_repr 59 60 61def _format_callback(func, args, suffix=''): 62 if isinstance(func, functools.partial): 63 if args is not None: 64 suffix = _format_args(args) + suffix 65 return _format_callback(func.func, func.args, suffix) 66 67 if hasattr(func, '__qualname__'): 68 func_repr = getattr(func, '__qualname__') 69 elif hasattr(func, '__name__'): 70 func_repr = getattr(func, '__name__') 71 else: 72 func_repr = repr(func) 73 74 if args is not None: 75 func_repr += _format_args(args) 76 if suffix: 77 func_repr += suffix 78 return func_repr 79 80def _format_callback_source(func, args): 81 func_repr = _format_callback(func, args) 82 source = _get_function_source(func) 83 if source: 84 func_repr += ' at %s:%s' % source 85 return func_repr 86 87 88class Handle(object): 89 """Object returned by callback registration methods.""" 90 91 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 92 '_source_traceback', '_repr', '__weakref__') 93 94 def __init__(self, callback, args, loop): 95 assert not isinstance(callback, Handle), 'A Handle is not a callback' 96 self._loop = loop 97 self._callback = callback 98 self._args = args 99 self._cancelled = False 100 self._repr = None 101 if self._loop.get_debug(): 102 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 103 else: 104 self._source_traceback = None 105 106 def _repr_info(self): 107 info = [self.__class__.__name__] 108 if self._cancelled: 109 info.append('cancelled') 110 if self._callback is not None: 111 info.append(_format_callback_source(self._callback, self._args)) 112 if self._source_traceback: 113 frame = self._source_traceback[-1] 114 info.append('created at %s:%s' % (frame[0], frame[1])) 115 return info 116 117 def __repr__(self): 118 if self._repr is not None: 119 return self._repr 120 info = self._repr_info() 121 return '<%s>' % ' '.join(info) 122 123 def cancel(self): 124 if not self._cancelled: 125 self._cancelled = True 126 if self._loop.get_debug(): 127 # Keep a representation in debug mode to keep callback and 128 # parameters. For example, to log the warning 129 # "Executing <Handle...> took 2.5 second" 130 self._repr = repr(self) 131 self._callback = None 132 self._args = None 133 134 def _run(self): 135 try: 136 self._callback(*self._args) 137 except Exception as exc: 138 cb = _format_callback_source(self._callback, self._args) 139 msg = 'Exception in callback {0}'.format(cb) 140 context = { 141 'message': msg, 142 'exception': exc, 143 'handle': self, 144 } 145 if self._source_traceback: 146 context['source_traceback'] = self._source_traceback 147 self._loop.call_exception_handler(context) 148 self = None # Needed to break cycles when an exception occurs. 149 150 151class TimerHandle(Handle): 152 """Object returned by timed callback registration methods.""" 153 154 __slots__ = ['_scheduled', '_when'] 155 156 def __init__(self, when, callback, args, loop): 157 assert when is not None 158 super(TimerHandle, self).__init__(callback, args, loop) 159 if self._source_traceback: 160 del self._source_traceback[-1] 161 self._when = when 162 self._scheduled = False 163 164 def _repr_info(self): 165 info = super(TimerHandle, self)._repr_info() 166 pos = 2 if self._cancelled else 1 167 info.insert(pos, 'when=%s' % self._when) 168 return info 169 170 def __hash__(self): 171 return hash(self._when) 172 173 def __lt__(self, other): 174 return self._when < other._when 175 176 def __le__(self, other): 177 if self._when < other._when: 178 return True 179 return self.__eq__(other) 180 181 def __gt__(self, other): 182 return self._when > other._when 183 184 def __ge__(self, other): 185 if self._when > other._when: 186 return True 187 return self.__eq__(other) 188 189 def __eq__(self, other): 190 if isinstance(other, TimerHandle): 191 return (self._when == other._when and 192 self._callback == other._callback and 193 self._args == other._args and 194 self._cancelled == other._cancelled) 195 return NotImplemented 196 197 def __ne__(self, other): 198 equal = self.__eq__(other) 199 return NotImplemented if equal is NotImplemented else not equal 200 201 def cancel(self): 202 if not self._cancelled: 203 self._loop._timer_handle_cancelled(self) 204 super(TimerHandle, self).cancel() 205 206 207class AbstractServer(object): 208 """Abstract server returned by create_server().""" 209 210 def close(self): 211 """Stop serving. This leaves existing connections open.""" 212 return NotImplemented 213 214 def wait_closed(self): 215 """Coroutine to wait until service is closed.""" 216 return NotImplemented 217 218 219if asyncio is not None: 220 # Reuse asyncio classes so asyncio.set_event_loop() and 221 # asyncio.set_event_loop_policy() accept Trollius event loop and trollius 222 # event loop policy 223 AbstractEventLoop = asyncio.AbstractEventLoop 224 AbstractEventLoopPolicy = asyncio.AbstractEventLoopPolicy 225else: 226 class AbstractEventLoop(object): 227 """Abstract event loop.""" 228 229 # Running and stopping the event loop. 230 231 def run_forever(self): 232 """Run the event loop until stop() is called.""" 233 raise NotImplementedError 234 235 def run_until_complete(self, future): 236 """Run the event loop until a Future is done. 237 238 Return the Future's result, or raise its exception. 239 """ 240 raise NotImplementedError 241 242 def stop(self): 243 """Stop the event loop as soon as reasonable. 244 245 Exactly how soon that is may depend on the implementation, but 246 no more I/O callbacks should be scheduled. 247 """ 248 raise NotImplementedError 249 250 def is_running(self): 251 """Return whether the event loop is currently running.""" 252 raise NotImplementedError 253 254 def is_closed(self): 255 """Returns True if the event loop was closed.""" 256 raise NotImplementedError 257 258 def close(self): 259 """Close the loop. 260 261 The loop should not be running. 262 263 This is idempotent and irreversible. 264 265 No other methods should be called after this one. 266 """ 267 raise NotImplementedError 268 269 # Methods scheduling callbacks. All these return Handles. 270 271 def _timer_handle_cancelled(self, handle): 272 """Notification that a TimerHandle has been cancelled.""" 273 raise NotImplementedError 274 275 def call_soon(self, callback, *args): 276 return self.call_later(0, callback, *args) 277 278 def call_later(self, delay, callback, *args): 279 raise NotImplementedError 280 281 def call_at(self, when, callback, *args): 282 raise NotImplementedError 283 284 def time(self): 285 raise NotImplementedError 286 287 # Method scheduling a coroutine object: create a task. 288 289 def create_task(self, coro): 290 raise NotImplementedError 291 292 # Methods for interacting with threads. 293 294 def call_soon_threadsafe(self, callback, *args): 295 raise NotImplementedError 296 297 def run_in_executor(self, executor, func, *args): 298 raise NotImplementedError 299 300 def set_default_executor(self, executor): 301 raise NotImplementedError 302 303 # Network I/O methods returning Futures. 304 305 def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0): 306 raise NotImplementedError 307 308 def getnameinfo(self, sockaddr, flags=0): 309 raise NotImplementedError 310 311 def create_connection(self, protocol_factory, host=None, port=None, 312 ssl=None, family=0, proto=0, flags=0, sock=None, 313 local_addr=None, server_hostname=None): 314 raise NotImplementedError 315 316 def create_server(self, protocol_factory, host=None, port=None, 317 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, 318 sock=None, backlog=100, ssl=None, reuse_address=None): 319 """A coroutine which creates a TCP server bound to host and port. 320 321 The return value is a Server object which can be used to stop 322 the service. 323 324 If host is an empty string or None all interfaces are assumed 325 and a list of multiple sockets will be returned (most likely 326 one for IPv4 and another one for IPv6). 327 328 family can be set to either AF_INET or AF_INET6 to force the 329 socket to use IPv4 or IPv6. If not set it will be determined 330 from host (defaults to AF_UNSPEC). 331 332 flags is a bitmask for getaddrinfo(). 333 334 sock can optionally be specified in order to use a preexisting 335 socket object. 336 337 backlog is the maximum number of queued connections passed to 338 listen() (defaults to 100). 339 340 ssl can be set to an SSLContext to enable SSL over the 341 accepted connections. 342 343 reuse_address tells the kernel to reuse a local socket in 344 TIME_WAIT state, without waiting for its natural timeout to 345 expire. If not specified will automatically be set to True on 346 UNIX. 347 """ 348 raise NotImplementedError 349 350 def create_unix_connection(self, protocol_factory, path, 351 ssl=None, sock=None, 352 server_hostname=None): 353 raise NotImplementedError 354 355 def create_unix_server(self, protocol_factory, path, 356 sock=None, backlog=100, ssl=None): 357 """A coroutine which creates a UNIX Domain Socket server. 358 359 The return value is a Server object, which can be used to stop 360 the service. 361 362 path is a str, representing a file systsem path to bind the 363 server socket to. 364 365 sock can optionally be specified in order to use a preexisting 366 socket object. 367 368 backlog is the maximum number of queued connections passed to 369 listen() (defaults to 100). 370 371 ssl can be set to an SSLContext to enable SSL over the 372 accepted connections. 373 """ 374 raise NotImplementedError 375 376 def create_datagram_endpoint(self, protocol_factory, 377 local_addr=None, remote_addr=None, 378 family=0, proto=0, flags=0): 379 raise NotImplementedError 380 381 # Pipes and subprocesses. 382 383 def connect_read_pipe(self, protocol_factory, pipe): 384 """Register read pipe in event loop. Set the pipe to non-blocking mode. 385 386 protocol_factory should instantiate object with Protocol interface. 387 pipe is a file-like object. 388 Return pair (transport, protocol), where transport supports the 389 ReadTransport interface.""" 390 # The reason to accept file-like object instead of just file descriptor 391 # is: we need to own pipe and close it at transport finishing 392 # Can got complicated errors if pass f.fileno(), 393 # close fd in pipe transport then close f and vise versa. 394 raise NotImplementedError 395 396 def connect_write_pipe(self, protocol_factory, pipe): 397 """Register write pipe in event loop. 398 399 protocol_factory should instantiate object with BaseProtocol interface. 400 Pipe is file-like object already switched to nonblocking. 401 Return pair (transport, protocol), where transport support 402 WriteTransport interface.""" 403 # The reason to accept file-like object instead of just file descriptor 404 # is: we need to own pipe and close it at transport finishing 405 # Can got complicated errors if pass f.fileno(), 406 # close fd in pipe transport then close f and vise versa. 407 raise NotImplementedError 408 409 def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE, 410 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 411 **kwargs): 412 raise NotImplementedError 413 414 def subprocess_exec(self, protocol_factory, *args, **kwargs): 415 raise NotImplementedError 416 417 # Ready-based callback registration methods. 418 # The add_*() methods return None. 419 # The remove_*() methods return True if something was removed, 420 # False if there was nothing to delete. 421 422 def add_reader(self, fd, callback, *args): 423 raise NotImplementedError 424 425 def remove_reader(self, fd): 426 raise NotImplementedError 427 428 def add_writer(self, fd, callback, *args): 429 raise NotImplementedError 430 431 def remove_writer(self, fd): 432 raise NotImplementedError 433 434 # Completion based I/O methods returning Futures. 435 436 def sock_recv(self, sock, nbytes): 437 raise NotImplementedError 438 439 def sock_sendall(self, sock, data): 440 raise NotImplementedError 441 442 def sock_connect(self, sock, address): 443 raise NotImplementedError 444 445 def sock_accept(self, sock): 446 raise NotImplementedError 447 448 # Signal handling. 449 450 def add_signal_handler(self, sig, callback, *args): 451 raise NotImplementedError 452 453 def remove_signal_handler(self, sig): 454 raise NotImplementedError 455 456 # Task factory. 457 458 def set_task_factory(self, factory): 459 raise NotImplementedError 460 461 def get_task_factory(self): 462 raise NotImplementedError 463 464 # Error handlers. 465 466 def set_exception_handler(self, handler): 467 raise NotImplementedError 468 469 def default_exception_handler(self, context): 470 raise NotImplementedError 471 472 def call_exception_handler(self, context): 473 raise NotImplementedError 474 475 # Debug flag management. 476 477 def get_debug(self): 478 raise NotImplementedError 479 480 def set_debug(self, enabled): 481 raise NotImplementedError 482 483 484 class AbstractEventLoopPolicy(object): 485 """Abstract policy for accessing the event loop.""" 486 487 def get_event_loop(self): 488 """Get the event loop for the current context. 489 490 Returns an event loop object implementing the BaseEventLoop interface, 491 or raises an exception in case no event loop has been set for the 492 current context and the current policy does not specify to create one. 493 494 It should never return None.""" 495 raise NotImplementedError 496 497 def set_event_loop(self, loop): 498 """Set the event loop for the current context to loop.""" 499 raise NotImplementedError 500 501 def new_event_loop(self): 502 """Create and return a new event loop object according to this 503 policy's rules. If there's need to set this loop as the event loop for 504 the current context, set_event_loop must be called explicitly.""" 505 raise NotImplementedError 506 507 # Child processes handling (Unix only). 508 509 def get_child_watcher(self): 510 "Get the watcher for child processes." 511 raise NotImplementedError 512 513 def set_child_watcher(self, watcher): 514 """Set the watcher for child processes.""" 515 raise NotImplementedError 516 517 518class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): 519 """Default policy implementation for accessing the event loop. 520 521 In this policy, each thread has its own event loop. However, we 522 only automatically create an event loop by default for the main 523 thread; other threads by default have no event loop. 524 525 Other policies may have different rules (e.g. a single global 526 event loop, or automatically creating an event loop per thread, or 527 using some other notion of context to which an event loop is 528 associated). 529 """ 530 531 _loop_factory = None 532 533 class _Local(threading.local): 534 _loop = None 535 _set_called = False 536 537 def __init__(self): 538 self._local = self._Local() 539 540 def get_event_loop(self): 541 """Get the event loop. 542 543 This may be None or an instance of EventLoop. 544 """ 545 if (self._local._loop is None and 546 not self._local._set_called and 547 isinstance(threading.current_thread(), threading._MainThread)): 548 self.set_event_loop(self.new_event_loop()) 549 if self._local._loop is None: 550 raise RuntimeError('There is no current event loop in thread %r.' 551 % threading.current_thread().name) 552 return self._local._loop 553 554 def set_event_loop(self, loop): 555 """Set the event loop.""" 556 self._local._set_called = True 557 assert loop is None or isinstance(loop, AbstractEventLoop) 558 self._local._loop = loop 559 560 def new_event_loop(self): 561 """Create a new event loop. 562 563 You must call set_event_loop() to make this the current event 564 loop. 565 """ 566 return self._loop_factory() 567 568 569# Event loop policy. The policy itself is always global, even if the 570# policy's rules say that there is an event loop per thread (or other 571# notion of context). The default policy is installed by the first 572# call to get_event_loop_policy(). 573_event_loop_policy = None 574 575# Lock for protecting the on-the-fly creation of the event loop policy. 576_lock = threading.Lock() 577 578 579def _init_event_loop_policy(): 580 global _event_loop_policy 581 with _lock: 582 if _event_loop_policy is None: # pragma: no branch 583 from . import DefaultEventLoopPolicy 584 _event_loop_policy = DefaultEventLoopPolicy() 585 586 587def get_event_loop_policy(): 588 """Get the current event loop policy.""" 589 if _event_loop_policy is None: 590 _init_event_loop_policy() 591 return _event_loop_policy 592 593 594def set_event_loop_policy(policy): 595 """Set the current event loop policy. 596 597 If policy is None, the default policy is restored.""" 598 global _event_loop_policy 599 assert policy is None or isinstance(policy, AbstractEventLoopPolicy) 600 _event_loop_policy = policy 601 602 603def get_event_loop(): 604 """Equivalent to calling get_event_loop_policy().get_event_loop().""" 605 return get_event_loop_policy().get_event_loop() 606 607 608def set_event_loop(loop): 609 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 610 get_event_loop_policy().set_event_loop(loop) 611 612 613def new_event_loop(): 614 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 615 return get_event_loop_policy().new_event_loop() 616 617 618def get_child_watcher(): 619 """Equivalent to calling get_event_loop_policy().get_child_watcher().""" 620 return get_event_loop_policy().get_child_watcher() 621 622 623def set_child_watcher(watcher): 624 """Equivalent to calling 625 get_event_loop_policy().set_child_watcher(watcher).""" 626 return get_event_loop_policy().set_child_watcher(watcher) 627