1import time  # noqa: F401
2from rpyc.lib import Timeout
3from rpyc.lib.compat import TimeoutError as AsyncResultTimeout
4
5
6class AsyncResult(object):
7    """*AsyncResult* represents a computation that occurs in the background and
8    will eventually have a result. Use the :attr:`value` property to access the
9    result (which will block if the result has not yet arrived).
10    """
11    __slots__ = ["_conn", "_is_ready", "_is_exc", "_callbacks", "_obj", "_ttl"]
12
13    def __init__(self, conn):
14        self._conn = conn
15        self._is_ready = False
16        self._is_exc = None
17        self._obj = None
18        self._callbacks = []
19        self._ttl = Timeout(None)
20
21    def __repr__(self):
22        if self._is_ready:
23            state = "ready"
24        elif self._is_exc:
25            state = "error"
26        elif self.expired:
27            state = "expired"
28        else:
29            state = "pending"
30        return "<AsyncResult object (%s) at 0x%08x>" % (state, id(self))
31
32    def __call__(self, is_exc, obj):
33        if self.expired:
34            return
35        self._is_exc = is_exc
36        self._obj = obj
37        self._is_ready = True
38        for cb in self._callbacks:
39            cb(self)
40        del self._callbacks[:]
41
42    def wait(self):
43        """Waits for the result to arrive. If the AsyncResult object has an
44        expiry set, and the result did not arrive within that timeout,
45        an :class:`AsyncResultTimeout` exception is raised"""
46        while not self._is_ready and not self._ttl.expired():
47            self._conn.serve(self._ttl)
48        if not self._is_ready:
49            raise AsyncResultTimeout("result expired")
50
51    def add_callback(self, func):
52        """Adds a callback to be invoked when the result arrives. The callback
53        function takes a single argument, which is the current AsyncResult
54        (``self``). If the result has already arrived, the function is invoked
55        immediately.
56
57        :param func: the callback function to add
58        """
59        if self._is_ready:
60            func(self)
61        else:
62            self._callbacks.append(func)
63
64    def set_expiry(self, timeout):
65        """Sets the expiry time (in seconds, relative to now) or ``None`` for
66        unlimited time
67
68        :param timeout: the expiry time in seconds or ``None``
69        """
70        self._ttl = Timeout(timeout)
71
72    @property
73    def ready(self):
74        """Indicates whether the result has arrived"""
75        if self._is_ready:
76            return True
77        if self._ttl.expired():
78            return False
79        self._conn.poll_all()
80        return self._is_ready
81
82    @property
83    def error(self):
84        """Indicates whether the returned result is an exception"""
85        return self.ready and self._is_exc
86
87    @property
88    def expired(self):
89        """Indicates whether the AsyncResult has expired"""
90        return not self._is_ready and self._ttl.expired()
91
92    @property
93    def value(self):
94        """Returns the result of the operation. If the result has not yet
95        arrived, accessing this property will wait for it. If the result does
96        not arrive before the expiry time elapses, :class:`AsyncResultTimeout`
97        is raised. If the returned result is an exception, it will be raised
98        here. Otherwise, the result is returned directly.
99        """
100        self.wait()
101        if self._is_exc:
102            raise self._obj
103        else:
104            return self._obj
105