1import time # noqa: F401 2from rpyc.lib import Timeout 3from rpyc.lib.compat import TimeoutError as AsyncResultTimeout 4 5 6class AsyncResult(object): 7 """*AsyncResult* represents a computation that occurs in the background and 8 will eventually have a result. Use the :attr:`value` property to access the 9 result (which will block if the result has not yet arrived). 10 """ 11 __slots__ = ["_conn", "_is_ready", "_is_exc", "_callbacks", "_obj", "_ttl"] 12 13 def __init__(self, conn): 14 self._conn = conn 15 self._is_ready = False 16 self._is_exc = None 17 self._obj = None 18 self._callbacks = [] 19 self._ttl = Timeout(None) 20 21 def __repr__(self): 22 if self._is_ready: 23 state = "ready" 24 elif self._is_exc: 25 state = "error" 26 elif self.expired: 27 state = "expired" 28 else: 29 state = "pending" 30 return "<AsyncResult object (%s) at 0x%08x>" % (state, id(self)) 31 32 def __call__(self, is_exc, obj): 33 if self.expired: 34 return 35 self._is_exc = is_exc 36 self._obj = obj 37 self._is_ready = True 38 for cb in self._callbacks: 39 cb(self) 40 del self._callbacks[:] 41 42 def wait(self): 43 """Waits for the result to arrive. If the AsyncResult object has an 44 expiry set, and the result did not arrive within that timeout, 45 an :class:`AsyncResultTimeout` exception is raised""" 46 while not self._is_ready and not self._ttl.expired(): 47 self._conn.serve(self._ttl) 48 if not self._is_ready: 49 raise AsyncResultTimeout("result expired") 50 51 def add_callback(self, func): 52 """Adds a callback to be invoked when the result arrives. The callback 53 function takes a single argument, which is the current AsyncResult 54 (``self``). If the result has already arrived, the function is invoked 55 immediately. 56 57 :param func: the callback function to add 58 """ 59 if self._is_ready: 60 func(self) 61 else: 62 self._callbacks.append(func) 63 64 def set_expiry(self, timeout): 65 """Sets the expiry time (in seconds, relative to now) or ``None`` for 66 unlimited time 67 68 :param timeout: the expiry time in seconds or ``None`` 69 """ 70 self._ttl = Timeout(timeout) 71 72 @property 73 def ready(self): 74 """Indicates whether the result has arrived""" 75 if self._is_ready: 76 return True 77 if self._ttl.expired(): 78 return False 79 self._conn.poll_all() 80 return self._is_ready 81 82 @property 83 def error(self): 84 """Indicates whether the returned result is an exception""" 85 return self.ready and self._is_exc 86 87 @property 88 def expired(self): 89 """Indicates whether the AsyncResult has expired""" 90 return not self._is_ready and self._ttl.expired() 91 92 @property 93 def value(self): 94 """Returns the result of the operation. If the result has not yet 95 arrived, accessing this property will wait for it. If the result does 96 not arrive before the expiry time elapses, :class:`AsyncResultTimeout` 97 is raised. If the returned result is an exception, it will be raised 98 here. Otherwise, the result is returned directly. 99 """ 100 self.wait() 101 if self._is_exc: 102 raise self._obj 103 else: 104 return self._obj 105