1# -*- test-case-name: foolscap.test.test_eventual -*- 2 3from twisted.internet import reactor, defer 4from twisted.python import log 5 6class _SimpleCallQueue(object): 7 # XXX TODO: merge epsilon.cooperator in, and make this more complete. 8 def __init__(self): 9 self._events = [] 10 self._flushObservers = [] 11 self._timer = None 12 13 def append(self, cb, args, kwargs): 14 self._events.append((cb, args, kwargs)) 15 if not self._timer: 16 self._timer = reactor.callLater(0, self._turn) 17 18 def _turn(self): 19 self._timer = None 20 # flush all the messages that are currently in the queue. If anything 21 # gets added to the queue while we're doing this, those events will 22 # be put off until the next turn. 23 events, self._events = self._events, [] 24 for cb, args, kwargs in events: 25 try: 26 cb(*args, **kwargs) 27 except: 28 log.err() 29 if not self._events: 30 observers, self._flushObservers = self._flushObservers, [] 31 for o in observers: 32 o.callback(None) 33 34 def flush(self): 35 """Return a Deferred that will fire (with None) when the call queue 36 is completely empty.""" 37 if not self._events: 38 return defer.succeed(None) 39 d = defer.Deferred() 40 self._flushObservers.append(d) 41 return d 42 43 44_theSimpleQueue = _SimpleCallQueue() 45 46def eventually(cb, *args, **kwargs): 47 """This is the eventual-send operation, used as a plan-coordination 48 primitive. The callable will be invoked (with args and kwargs) in a later 49 reactor turn. Doing 'eventually(a); eventually(b)' guarantees that a will 50 be called before b. 51 52 Any exceptions that occur in the callable will be logged with log.err(). 53 If you really want to ignore them, be sure to provide a callable that 54 catches those exceptions. 55 56 This function returns None. If you care to know when the callable was 57 run, be sure to provide a callable that notifies somebody. 58 """ 59 _theSimpleQueue.append(cb, args, kwargs) 60 61 62def fireEventually(value=None): 63 """This returns a Deferred which will fire in a later reactor turn, after 64 the current call stack has been completed, and after all other deferreds 65 previously scheduled with callEventually(). 66 """ 67 d = defer.Deferred() 68 eventually(d.callback, value) 69 return d 70 71def flushEventualQueue(_ignored=None): 72 """This returns a Deferred which fires when the eventual-send queue is 73 finally empty. This is useful to wait upon as the last step of a Trial 74 test method. 75 """ 76 return _theSimpleQueue.flush() 77