1"""Event loop and event loop policy."""
2from __future__ import absolute_import
3
4__all__ = ['AbstractEventLoopPolicy',
5           'AbstractEventLoop', 'AbstractServer',
6           'Handle', 'TimerHandle',
7           'get_event_loop_policy', 'set_event_loop_policy',
8           'get_event_loop', 'set_event_loop', 'new_event_loop',
9           'get_child_watcher', 'set_child_watcher',
10           ]
11
12import functools
13import inspect
14import socket
15import subprocess
16import sys
17import threading
18import traceback
19try:
20    import reprlib   # Python 3
21except ImportError:
22    import repr as reprlib   # Python 2
23
24try:
25    import asyncio
26except (ImportError, SyntaxError):
27    # ignore SyntaxError for convenience: ignore SyntaxError caused by "yield
28    # from" if asyncio module is in the Python path
29    asyncio = None
30
31from trollius import compat
32
33
34def _get_function_source(func):
35    if compat.PY34:
36        func = inspect.unwrap(func)
37    elif hasattr(func, '__wrapped__'):
38        func = func.__wrapped__
39    if inspect.isfunction(func):
40        code = func.__code__
41        return (code.co_filename, code.co_firstlineno)
42    if isinstance(func, functools.partial):
43        return _get_function_source(func.func)
44    if compat.PY34 and isinstance(func, functools.partialmethod):
45        return _get_function_source(func.func)
46    return None
47
48
49def _format_args(args):
50    """Format function arguments.
51
52    Special case for a single parameter: ('hello',) is formatted as ('hello').
53    """
54    # use reprlib to limit the length of the output
55    args_repr = reprlib.repr(args)
56    if len(args) == 1 and args_repr.endswith(',)'):
57        args_repr = args_repr[:-2] + ')'
58    return args_repr
59
60
61def _format_callback(func, args, suffix=''):
62    if isinstance(func, functools.partial):
63        if args is not None:
64            suffix = _format_args(args) + suffix
65        return _format_callback(func.func, func.args, suffix)
66
67    if hasattr(func, '__qualname__'):
68        func_repr = getattr(func, '__qualname__')
69    elif hasattr(func, '__name__'):
70        func_repr = getattr(func, '__name__')
71    else:
72        func_repr = repr(func)
73
74    if args is not None:
75        func_repr += _format_args(args)
76    if suffix:
77        func_repr += suffix
78    return func_repr
79
80def _format_callback_source(func, args):
81    func_repr = _format_callback(func, args)
82    source = _get_function_source(func)
83    if source:
84        func_repr += ' at %s:%s' % source
85    return func_repr
86
87
88class Handle(object):
89    """Object returned by callback registration methods."""
90
91    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
92                 '_source_traceback', '_repr', '__weakref__')
93
94    def __init__(self, callback, args, loop):
95        assert not isinstance(callback, Handle), 'A Handle is not a callback'
96        self._loop = loop
97        self._callback = callback
98        self._args = args
99        self._cancelled = False
100        self._repr = None
101        if self._loop.get_debug():
102            self._source_traceback = traceback.extract_stack(sys._getframe(1))
103        else:
104            self._source_traceback = None
105
106    def _repr_info(self):
107        info = [self.__class__.__name__]
108        if self._cancelled:
109            info.append('cancelled')
110        if self._callback is not None:
111            info.append(_format_callback_source(self._callback, self._args))
112        if self._source_traceback:
113            frame = self._source_traceback[-1]
114            info.append('created at %s:%s' % (frame[0], frame[1]))
115        return info
116
117    def __repr__(self):
118        if self._repr is not None:
119            return self._repr
120        info = self._repr_info()
121        return '<%s>' % ' '.join(info)
122
123    def cancel(self):
124        if not self._cancelled:
125            self._cancelled = True
126            if self._loop.get_debug():
127                # Keep a representation in debug mode to keep callback and
128                # parameters. For example, to log the warning
129                # "Executing <Handle...> took 2.5 second"
130                self._repr = repr(self)
131            self._callback = None
132            self._args = None
133
134    def _run(self):
135        try:
136            self._callback(*self._args)
137        except Exception as exc:
138            cb = _format_callback_source(self._callback, self._args)
139            msg = 'Exception in callback {0}'.format(cb)
140            context = {
141                'message': msg,
142                'exception': exc,
143                'handle': self,
144            }
145            if self._source_traceback:
146                context['source_traceback'] = self._source_traceback
147            self._loop.call_exception_handler(context)
148        self = None  # Needed to break cycles when an exception occurs.
149
150
151class TimerHandle(Handle):
152    """Object returned by timed callback registration methods."""
153
154    __slots__ = ['_scheduled', '_when']
155
156    def __init__(self, when, callback, args, loop):
157        assert when is not None
158        super(TimerHandle, self).__init__(callback, args, loop)
159        if self._source_traceback:
160            del self._source_traceback[-1]
161        self._when = when
162        self._scheduled = False
163
164    def _repr_info(self):
165        info = super(TimerHandle, self)._repr_info()
166        pos = 2 if self._cancelled else 1
167        info.insert(pos, 'when=%s' % self._when)
168        return info
169
170    def __hash__(self):
171        return hash(self._when)
172
173    def __lt__(self, other):
174        return self._when < other._when
175
176    def __le__(self, other):
177        if self._when < other._when:
178            return True
179        return self.__eq__(other)
180
181    def __gt__(self, other):
182        return self._when > other._when
183
184    def __ge__(self, other):
185        if self._when > other._when:
186            return True
187        return self.__eq__(other)
188
189    def __eq__(self, other):
190        if isinstance(other, TimerHandle):
191            return (self._when == other._when and
192                    self._callback == other._callback and
193                    self._args == other._args and
194                    self._cancelled == other._cancelled)
195        return NotImplemented
196
197    def __ne__(self, other):
198        equal = self.__eq__(other)
199        return NotImplemented if equal is NotImplemented else not equal
200
201    def cancel(self):
202        if not self._cancelled:
203            self._loop._timer_handle_cancelled(self)
204        super(TimerHandle, self).cancel()
205
206
207class AbstractServer(object):
208    """Abstract server returned by create_server()."""
209
210    def close(self):
211        """Stop serving.  This leaves existing connections open."""
212        return NotImplemented
213
214    def wait_closed(self):
215        """Coroutine to wait until service is closed."""
216        return NotImplemented
217
218
219if asyncio is not None:
220    # Reuse asyncio classes so asyncio.set_event_loop() and
221    # asyncio.set_event_loop_policy() accept Trollius event loop and trollius
222    # event loop policy
223    AbstractEventLoop = asyncio.AbstractEventLoop
224    AbstractEventLoopPolicy = asyncio.AbstractEventLoopPolicy
225else:
226    class AbstractEventLoop(object):
227        """Abstract event loop."""
228
229        # Running and stopping the event loop.
230
231        def run_forever(self):
232            """Run the event loop until stop() is called."""
233            raise NotImplementedError
234
235        def run_until_complete(self, future):
236            """Run the event loop until a Future is done.
237
238            Return the Future's result, or raise its exception.
239            """
240            raise NotImplementedError
241
242        def stop(self):
243            """Stop the event loop as soon as reasonable.
244
245            Exactly how soon that is may depend on the implementation, but
246            no more I/O callbacks should be scheduled.
247            """
248            raise NotImplementedError
249
250        def is_running(self):
251            """Return whether the event loop is currently running."""
252            raise NotImplementedError
253
254        def is_closed(self):
255            """Returns True if the event loop was closed."""
256            raise NotImplementedError
257
258        def close(self):
259            """Close the loop.
260
261            The loop should not be running.
262
263            This is idempotent and irreversible.
264
265            No other methods should be called after this one.
266            """
267            raise NotImplementedError
268
269        # Methods scheduling callbacks.  All these return Handles.
270
271        def _timer_handle_cancelled(self, handle):
272            """Notification that a TimerHandle has been cancelled."""
273            raise NotImplementedError
274
275        def call_soon(self, callback, *args):
276            return self.call_later(0, callback, *args)
277
278        def call_later(self, delay, callback, *args):
279            raise NotImplementedError
280
281        def call_at(self, when, callback, *args):
282            raise NotImplementedError
283
284        def time(self):
285            raise NotImplementedError
286
287        # Method scheduling a coroutine object: create a task.
288
289        def create_task(self, coro):
290            raise NotImplementedError
291
292        # Methods for interacting with threads.
293
294        def call_soon_threadsafe(self, callback, *args):
295            raise NotImplementedError
296
297        def run_in_executor(self, executor, func, *args):
298            raise NotImplementedError
299
300        def set_default_executor(self, executor):
301            raise NotImplementedError
302
303        # Network I/O methods returning Futures.
304
305        def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
306            raise NotImplementedError
307
308        def getnameinfo(self, sockaddr, flags=0):
309            raise NotImplementedError
310
311        def create_connection(self, protocol_factory, host=None, port=None,
312                              ssl=None, family=0, proto=0, flags=0, sock=None,
313                              local_addr=None, server_hostname=None):
314            raise NotImplementedError
315
316        def create_server(self, protocol_factory, host=None, port=None,
317                          family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
318                          sock=None, backlog=100, ssl=None, reuse_address=None):
319            """A coroutine which creates a TCP server bound to host and port.
320
321            The return value is a Server object which can be used to stop
322            the service.
323
324            If host is an empty string or None all interfaces are assumed
325            and a list of multiple sockets will be returned (most likely
326            one for IPv4 and another one for IPv6).
327
328            family can be set to either AF_INET or AF_INET6 to force the
329            socket to use IPv4 or IPv6. If not set it will be determined
330            from host (defaults to AF_UNSPEC).
331
332            flags is a bitmask for getaddrinfo().
333
334            sock can optionally be specified in order to use a preexisting
335            socket object.
336
337            backlog is the maximum number of queued connections passed to
338            listen() (defaults to 100).
339
340            ssl can be set to an SSLContext to enable SSL over the
341            accepted connections.
342
343            reuse_address tells the kernel to reuse a local socket in
344            TIME_WAIT state, without waiting for its natural timeout to
345            expire. If not specified will automatically be set to True on
346            UNIX.
347            """
348            raise NotImplementedError
349
350        def create_unix_connection(self, protocol_factory, path,
351                                   ssl=None, sock=None,
352                                   server_hostname=None):
353            raise NotImplementedError
354
355        def create_unix_server(self, protocol_factory, path,
356                               sock=None, backlog=100, ssl=None):
357            """A coroutine which creates a UNIX Domain Socket server.
358
359            The return value is a Server object, which can be used to stop
360            the service.
361
362            path is a str, representing a file systsem path to bind the
363            server socket to.
364
365            sock can optionally be specified in order to use a preexisting
366            socket object.
367
368            backlog is the maximum number of queued connections passed to
369            listen() (defaults to 100).
370
371            ssl can be set to an SSLContext to enable SSL over the
372            accepted connections.
373            """
374            raise NotImplementedError
375
376        def create_datagram_endpoint(self, protocol_factory,
377                                     local_addr=None, remote_addr=None,
378                                     family=0, proto=0, flags=0):
379            raise NotImplementedError
380
381        # Pipes and subprocesses.
382
383        def connect_read_pipe(self, protocol_factory, pipe):
384            """Register read pipe in event loop. Set the pipe to non-blocking mode.
385
386            protocol_factory should instantiate object with Protocol interface.
387            pipe is a file-like object.
388            Return pair (transport, protocol), where transport supports the
389            ReadTransport interface."""
390            # The reason to accept file-like object instead of just file descriptor
391            # is: we need to own pipe and close it at transport finishing
392            # Can got complicated errors if pass f.fileno(),
393            # close fd in pipe transport then close f and vise versa.
394            raise NotImplementedError
395
396        def connect_write_pipe(self, protocol_factory, pipe):
397            """Register write pipe in event loop.
398
399            protocol_factory should instantiate object with BaseProtocol interface.
400            Pipe is file-like object already switched to nonblocking.
401            Return pair (transport, protocol), where transport support
402            WriteTransport interface."""
403            # The reason to accept file-like object instead of just file descriptor
404            # is: we need to own pipe and close it at transport finishing
405            # Can got complicated errors if pass f.fileno(),
406            # close fd in pipe transport then close f and vise versa.
407            raise NotImplementedError
408
409        def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
410                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
411                             **kwargs):
412            raise NotImplementedError
413
414        def subprocess_exec(self, protocol_factory, *args, **kwargs):
415            raise NotImplementedError
416
417        # Ready-based callback registration methods.
418        # The add_*() methods return None.
419        # The remove_*() methods return True if something was removed,
420        # False if there was nothing to delete.
421
422        def add_reader(self, fd, callback, *args):
423            raise NotImplementedError
424
425        def remove_reader(self, fd):
426            raise NotImplementedError
427
428        def add_writer(self, fd, callback, *args):
429            raise NotImplementedError
430
431        def remove_writer(self, fd):
432            raise NotImplementedError
433
434        # Completion based I/O methods returning Futures.
435
436        def sock_recv(self, sock, nbytes):
437            raise NotImplementedError
438
439        def sock_sendall(self, sock, data):
440            raise NotImplementedError
441
442        def sock_connect(self, sock, address):
443            raise NotImplementedError
444
445        def sock_accept(self, sock):
446            raise NotImplementedError
447
448        # Signal handling.
449
450        def add_signal_handler(self, sig, callback, *args):
451            raise NotImplementedError
452
453        def remove_signal_handler(self, sig):
454            raise NotImplementedError
455
456        # Task factory.
457
458        def set_task_factory(self, factory):
459            raise NotImplementedError
460
461        def get_task_factory(self):
462            raise NotImplementedError
463
464        # Error handlers.
465
466        def set_exception_handler(self, handler):
467            raise NotImplementedError
468
469        def default_exception_handler(self, context):
470            raise NotImplementedError
471
472        def call_exception_handler(self, context):
473            raise NotImplementedError
474
475        # Debug flag management.
476
477        def get_debug(self):
478            raise NotImplementedError
479
480        def set_debug(self, enabled):
481            raise NotImplementedError
482
483
484    class AbstractEventLoopPolicy(object):
485        """Abstract policy for accessing the event loop."""
486
487        def get_event_loop(self):
488            """Get the event loop for the current context.
489
490            Returns an event loop object implementing the BaseEventLoop interface,
491            or raises an exception in case no event loop has been set for the
492            current context and the current policy does not specify to create one.
493
494            It should never return None."""
495            raise NotImplementedError
496
497        def set_event_loop(self, loop):
498            """Set the event loop for the current context to loop."""
499            raise NotImplementedError
500
501        def new_event_loop(self):
502            """Create and return a new event loop object according to this
503            policy's rules. If there's need to set this loop as the event loop for
504            the current context, set_event_loop must be called explicitly."""
505            raise NotImplementedError
506
507        # Child processes handling (Unix only).
508
509        def get_child_watcher(self):
510            "Get the watcher for child processes."
511            raise NotImplementedError
512
513        def set_child_watcher(self, watcher):
514            """Set the watcher for child processes."""
515            raise NotImplementedError
516
517
518class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
519    """Default policy implementation for accessing the event loop.
520
521    In this policy, each thread has its own event loop.  However, we
522    only automatically create an event loop by default for the main
523    thread; other threads by default have no event loop.
524
525    Other policies may have different rules (e.g. a single global
526    event loop, or automatically creating an event loop per thread, or
527    using some other notion of context to which an event loop is
528    associated).
529    """
530
531    _loop_factory = None
532
533    class _Local(threading.local):
534        _loop = None
535        _set_called = False
536
537    def __init__(self):
538        self._local = self._Local()
539
540    def get_event_loop(self):
541        """Get the event loop.
542
543        This may be None or an instance of EventLoop.
544        """
545        if (self._local._loop is None and
546            not self._local._set_called and
547            isinstance(threading.current_thread(), threading._MainThread)):
548            self.set_event_loop(self.new_event_loop())
549        if self._local._loop is None:
550            raise RuntimeError('There is no current event loop in thread %r.'
551                               % threading.current_thread().name)
552        return self._local._loop
553
554    def set_event_loop(self, loop):
555        """Set the event loop."""
556        self._local._set_called = True
557        assert loop is None or isinstance(loop, AbstractEventLoop)
558        self._local._loop = loop
559
560    def new_event_loop(self):
561        """Create a new event loop.
562
563        You must call set_event_loop() to make this the current event
564        loop.
565        """
566        return self._loop_factory()
567
568
569# Event loop policy.  The policy itself is always global, even if the
570# policy's rules say that there is an event loop per thread (or other
571# notion of context).  The default policy is installed by the first
572# call to get_event_loop_policy().
573_event_loop_policy = None
574
575# Lock for protecting the on-the-fly creation of the event loop policy.
576_lock = threading.Lock()
577
578
579def _init_event_loop_policy():
580    global _event_loop_policy
581    with _lock:
582        if _event_loop_policy is None:  # pragma: no branch
583            from . import DefaultEventLoopPolicy
584            _event_loop_policy = DefaultEventLoopPolicy()
585
586
587def get_event_loop_policy():
588    """Get the current event loop policy."""
589    if _event_loop_policy is None:
590        _init_event_loop_policy()
591    return _event_loop_policy
592
593
594def set_event_loop_policy(policy):
595    """Set the current event loop policy.
596
597    If policy is None, the default policy is restored."""
598    global _event_loop_policy
599    assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
600    _event_loop_policy = policy
601
602
603def get_event_loop():
604    """Equivalent to calling get_event_loop_policy().get_event_loop()."""
605    return get_event_loop_policy().get_event_loop()
606
607
608def set_event_loop(loop):
609    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
610    get_event_loop_policy().set_event_loop(loop)
611
612
613def new_event_loop():
614    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
615    return get_event_loop_policy().new_event_loop()
616
617
618def get_child_watcher():
619    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
620    return get_event_loop_policy().get_child_watcher()
621
622
623def set_child_watcher(watcher):
624    """Equivalent to calling
625    get_event_loop_policy().set_child_watcher(watcher)."""
626    return get_event_loop_policy().set_child_watcher(watcher)
627