1""" 2Twisted integration 3------------------- 4 5This module provides a very simple way to integrate your tests with the 6Twisted_ event loop. 7 8You must import this module *before* importing anything from Twisted itself! 9 10Example:: 11 12 from nose.twistedtools import reactor, deferred 13 14 @deferred() 15 def test_resolve(): 16 return reactor.resolve("www.python.org") 17 18Or, more realistically:: 19 20 @deferred(timeout=5.0) 21 def test_resolve(): 22 d = reactor.resolve("www.python.org") 23 def check_ip(ip): 24 assert ip == "67.15.36.43" 25 d.addCallback(check_ip) 26 return d 27 28.. _Twisted: http://twistedmatrix.com/trac/ 29""" 30 31import sys 32from Queue import Queue, Empty 33from nose.tools import make_decorator, TimeExpired 34 35__all__ = [ 36 'threaded_reactor', 'reactor', 'deferred', 'TimeExpired', 37 'stop_reactor' 38] 39 40_twisted_thread = None 41 42def threaded_reactor(): 43 """ 44 Start the Twisted reactor in a separate thread, if not already done. 45 Returns the reactor. 46 The thread will automatically be destroyed when all the tests are done. 47 """ 48 global _twisted_thread 49 try: 50 from twisted.internet import reactor 51 except ImportError: 52 return None, None 53 if not _twisted_thread: 54 from twisted.python import threadable 55 from threading import Thread 56 _twisted_thread = Thread(target=lambda: reactor.run( \ 57 installSignalHandlers=False)) 58 _twisted_thread.setDaemon(True) 59 _twisted_thread.start() 60 return reactor, _twisted_thread 61 62# Export global reactor variable, as Twisted does 63reactor, reactor_thread = threaded_reactor() 64 65 66def stop_reactor(): 67 """Stop the reactor and join the reactor thread until it stops. 68 Call this function in teardown at the module or package level to 69 reset the twisted system after your tests. You *must* do this if 70 you mix tests using these tools and tests using twisted.trial. 71 """ 72 global _twisted_thread 73 74 def stop_reactor(): 75 '''Helper for calling stop from withing the thread.''' 76 reactor.stop() 77 78 reactor.callFromThread(stop_reactor) 79 reactor_thread.join() 80 for p in reactor.getDelayedCalls(): 81 if p.active(): 82 p.cancel() 83 _twisted_thread = None 84 85 86def deferred(timeout=None): 87 """ 88 By wrapping a test function with this decorator, you can return a 89 twisted Deferred and the test will wait for the deferred to be triggered. 90 The whole test function will run inside the Twisted event loop. 91 92 The optional timeout parameter specifies the maximum duration of the test. 93 The difference with timed() is that timed() will still wait for the test 94 to end, while deferred() will stop the test when its timeout has expired. 95 The latter is more desireable when dealing with network tests, because 96 the result may actually never arrive. 97 98 If the callback is triggered, the test has passed. 99 If the errback is triggered or the timeout expires, the test has failed. 100 101 Example:: 102 103 @deferred(timeout=5.0) 104 def test_resolve(): 105 return reactor.resolve("www.python.org") 106 107 Attention! If you combine this decorator with other decorators (like 108 "raises"), deferred() must be called *first*! 109 110 In other words, this is good:: 111 112 @raises(DNSLookupError) 113 @deferred() 114 def test_error(): 115 return reactor.resolve("xxxjhjhj.biz") 116 117 and this is bad:: 118 119 @deferred() 120 @raises(DNSLookupError) 121 def test_error(): 122 return reactor.resolve("xxxjhjhj.biz") 123 """ 124 reactor, reactor_thread = threaded_reactor() 125 if reactor is None: 126 raise ImportError("twisted is not available or could not be imported") 127 # Check for common syntax mistake 128 # (otherwise, tests can be silently ignored 129 # if one writes "@deferred" instead of "@deferred()") 130 try: 131 timeout is None or timeout + 0 132 except TypeError: 133 raise TypeError("'timeout' argument must be a number or None") 134 135 def decorate(func): 136 def wrapper(*args, **kargs): 137 q = Queue() 138 def callback(value): 139 q.put(None) 140 def errback(failure): 141 # Retrieve and save full exception info 142 try: 143 failure.raiseException() 144 except: 145 q.put(sys.exc_info()) 146 def g(): 147 try: 148 d = func(*args, **kargs) 149 try: 150 d.addCallbacks(callback, errback) 151 # Check for a common mistake and display a nice error 152 # message 153 except AttributeError: 154 raise TypeError("you must return a twisted Deferred " 155 "from your test case!") 156 # Catch exceptions raised in the test body (from the 157 # Twisted thread) 158 except: 159 q.put(sys.exc_info()) 160 reactor.callFromThread(g) 161 try: 162 error = q.get(timeout=timeout) 163 except Empty: 164 raise TimeExpired("timeout expired before end of test (%f s.)" 165 % timeout) 166 # Re-raise all exceptions 167 if error is not None: 168 exc_type, exc_value, tb = error 169 raise exc_type, exc_value, tb 170 wrapper = make_decorator(func)(wrapper) 171 return wrapper 172 return decorate 173 174