1"""
2Twisted integration
3-------------------
4
5This module provides a very simple way to integrate your tests with the
6Twisted_ event loop.
7
8You must import this module *before* importing anything from Twisted itself!
9
10Example::
11
12  from nose.twistedtools import reactor, deferred
13
14  @deferred()
15  def test_resolve():
16      return reactor.resolve("www.python.org")
17
18Or, more realistically::
19
20  @deferred(timeout=5.0)
21  def test_resolve():
22      d = reactor.resolve("www.python.org")
23      def check_ip(ip):
24          assert ip == "67.15.36.43"
25      d.addCallback(check_ip)
26      return d
27
28.. _Twisted: http://twistedmatrix.com/trac/
29"""
30
31import sys
32from Queue import Queue, Empty
33from nose.tools import make_decorator, TimeExpired
34
35__all__ = [
36    'threaded_reactor', 'reactor', 'deferred', 'TimeExpired',
37    'stop_reactor'
38]
39
40_twisted_thread = None
41
42def threaded_reactor():
43    """
44    Start the Twisted reactor in a separate thread, if not already done.
45    Returns the reactor.
46    The thread will automatically be destroyed when all the tests are done.
47    """
48    global _twisted_thread
49    try:
50        from twisted.internet import reactor
51    except ImportError:
52        return None, None
53    if not _twisted_thread:
54        from twisted.python import threadable
55        from threading import Thread
56        _twisted_thread = Thread(target=lambda: reactor.run( \
57                installSignalHandlers=False))
58        _twisted_thread.setDaemon(True)
59        _twisted_thread.start()
60    return reactor, _twisted_thread
61
62# Export global reactor variable, as Twisted does
63reactor, reactor_thread = threaded_reactor()
64
65
66def stop_reactor():
67    """Stop the reactor and join the reactor thread until it stops.
68    Call this function in teardown at the module or package level to
69    reset the twisted system after your tests. You *must* do this if
70    you mix tests using these tools and tests using twisted.trial.
71    """
72    global _twisted_thread
73
74    def stop_reactor():
75        '''Helper for calling stop from withing the thread.'''
76        reactor.stop()
77
78    reactor.callFromThread(stop_reactor)
79    reactor_thread.join()
80    for p in reactor.getDelayedCalls():
81        if p.active():
82            p.cancel()
83    _twisted_thread = None
84
85
86def deferred(timeout=None):
87    """
88    By wrapping a test function with this decorator, you can return a
89    twisted Deferred and the test will wait for the deferred to be triggered.
90    The whole test function will run inside the Twisted event loop.
91
92    The optional timeout parameter specifies the maximum duration of the test.
93    The difference with timed() is that timed() will still wait for the test
94    to end, while deferred() will stop the test when its timeout has expired.
95    The latter is more desireable when dealing with network tests, because
96    the result may actually never arrive.
97
98    If the callback is triggered, the test has passed.
99    If the errback is triggered or the timeout expires, the test has failed.
100
101    Example::
102
103        @deferred(timeout=5.0)
104        def test_resolve():
105            return reactor.resolve("www.python.org")
106
107    Attention! If you combine this decorator with other decorators (like
108    "raises"), deferred() must be called *first*!
109
110    In other words, this is good::
111
112        @raises(DNSLookupError)
113        @deferred()
114        def test_error():
115            return reactor.resolve("xxxjhjhj.biz")
116
117    and this is bad::
118
119        @deferred()
120        @raises(DNSLookupError)
121        def test_error():
122            return reactor.resolve("xxxjhjhj.biz")
123    """
124    reactor, reactor_thread = threaded_reactor()
125    if reactor is None:
126        raise ImportError("twisted is not available or could not be imported")
127    # Check for common syntax mistake
128    # (otherwise, tests can be silently ignored
129    # if one writes "@deferred" instead of "@deferred()")
130    try:
131        timeout is None or timeout + 0
132    except TypeError:
133        raise TypeError("'timeout' argument must be a number or None")
134
135    def decorate(func):
136        def wrapper(*args, **kargs):
137            q = Queue()
138            def callback(value):
139                q.put(None)
140            def errback(failure):
141                # Retrieve and save full exception info
142                try:
143                    failure.raiseException()
144                except:
145                    q.put(sys.exc_info())
146            def g():
147                try:
148                    d = func(*args, **kargs)
149                    try:
150                        d.addCallbacks(callback, errback)
151                    # Check for a common mistake and display a nice error
152                    # message
153                    except AttributeError:
154                        raise TypeError("you must return a twisted Deferred "
155                                        "from your test case!")
156                # Catch exceptions raised in the test body (from the
157                # Twisted thread)
158                except:
159                    q.put(sys.exc_info())
160            reactor.callFromThread(g)
161            try:
162                error = q.get(timeout=timeout)
163            except Empty:
164                raise TimeExpired("timeout expired before end of test (%f s.)"
165                                  % timeout)
166            # Re-raise all exceptions
167            if error is not None:
168                exc_type, exc_value, tb = error
169                raise exc_type, exc_value, tb
170        wrapper = make_decorator(func)(wrapper)
171        return wrapper
172    return decorate
173
174