1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Utilities for unit testing reactor implementations.
6
7The main feature of this module is L{ReactorBuilder}, a base class for use when
8writing interface/blackbox tests for reactor implementations.  Test case classes
9for reactor features should subclass L{ReactorBuilder} instead of
10L{SynchronousTestCase}.  All of the features of L{SynchronousTestCase} will be
11available.  Additionally, the tests will automatically be applied to all
12available reactor implementations.
13"""
14
15from __future__ import division, absolute_import
16
17__metaclass__ = type
18
19__all__ = ['TestTimeoutError', 'ReactorBuilder', 'needsRunningReactor']
20
21import os, signal, time
22
23from twisted.python.compat import _PY3
24from twisted.trial.unittest import SynchronousTestCase, SkipTest
25from twisted.trial.util import DEFAULT_TIMEOUT_DURATION, acquireAttribute
26from twisted.python.runtime import platform
27from twisted.python.reflect import namedAny
28from twisted.python.deprecate import _fullyQualifiedName as fullyQualifiedName
29
30from twisted.python import log
31from twisted.python.failure import Failure
32
33
34# Access private APIs.
35if platform.isWindows():
36    process = None
37elif _PY3:
38    # Enable this on Python 3 when twisted.internet.process is ported.
39    # See #5968.
40    process = None
41else:
42    from twisted.internet import process
43
44
45
46class TestTimeoutError(Exception):
47    """
48    The reactor was still running after the timeout period elapsed in
49    L{ReactorBuilder.runReactor}.
50    """
51
52
53
54def needsRunningReactor(reactor, thunk):
55    """
56    Various functions within these tests need an already-running reactor at
57    some point.  They need to stop the reactor when the test has completed, and
58    that means calling reactor.stop().  However, reactor.stop() raises an
59    exception if the reactor isn't already running, so if the L{Deferred} that
60    a particular API under test returns fires synchronously (as especially an
61    endpoint's C{connect()} method may do, if the connect is to a local
62    interface address) then the test won't be able to stop the reactor being
63    tested and finish.  So this calls C{thunk} only once C{reactor} is running.
64
65    (This is just an alias for
66    L{twisted.internet.interfaces.IReactorCore.callWhenRunning} on the given
67    reactor parameter, in order to centrally reference the above paragraph and
68    repeating it everywhere as a comment.)
69
70    @param reactor: the L{twisted.internet.interfaces.IReactorCore} under test
71
72    @param thunk: a 0-argument callable, which eventually finishes the test in
73        question, probably in a L{Deferred} callback.
74    """
75    reactor.callWhenRunning(thunk)
76
77
78
79class ReactorBuilder:
80    """
81    L{SynchronousTestCase} mixin which provides a reactor-creation API.  This
82    mixin defines C{setUp} and C{tearDown}, so mix it in before
83    L{SynchronousTestCase} or call its methods from the overridden ones in the
84    subclass.
85
86    @cvar skippedReactors: A dict mapping FQPN strings of reactors for
87        which the tests defined by this class will be skipped to strings
88        giving the skip message.
89    @cvar requiredInterfaces: A C{list} of interfaces which the reactor must
90        provide or these tests will be skipped.  The default, C{None}, means
91        that no interfaces are required.
92    @ivar reactorFactory: A no-argument callable which returns the reactor to
93        use for testing.
94    @ivar originalHandler: The SIGCHLD handler which was installed when setUp
95        ran and which will be re-installed when tearDown runs.
96    @ivar _reactors: A list of FQPN strings giving the reactors for which
97        L{SynchronousTestCase}s will be created.
98    """
99
100    _reactors = [
101        # Select works everywhere
102        "twisted.internet.selectreactor.SelectReactor",
103        ]
104
105    if platform.isWindows():
106        # PortableGtkReactor is only really interesting on Windows,
107        # but not really Windows specific; if you want you can
108        # temporarily move this up to the all-platforms list to test
109        # it on other platforms.  It's not there in general because
110        # it's not _really_ worth it to support on other platforms,
111        # since no one really wants to use it on other platforms.
112        _reactors.extend([
113                "twisted.internet.gtk2reactor.PortableGtkReactor",
114                "twisted.internet.gireactor.PortableGIReactor",
115                "twisted.internet.gtk3reactor.PortableGtk3Reactor",
116                "twisted.internet.win32eventreactor.Win32Reactor",
117                "twisted.internet.iocpreactor.reactor.IOCPReactor"])
118    else:
119        _reactors.extend([
120                "twisted.internet.glib2reactor.Glib2Reactor",
121                "twisted.internet.gtk2reactor.Gtk2Reactor",
122                "twisted.internet.gireactor.GIReactor",
123                "twisted.internet.gtk3reactor.Gtk3Reactor"])
124        if platform.isMacOSX():
125            _reactors.append("twisted.internet.cfreactor.CFReactor")
126        else:
127            _reactors.extend([
128                    "twisted.internet.pollreactor.PollReactor",
129                    "twisted.internet.epollreactor.EPollReactor"])
130            if not platform.isLinux():
131                # Presumably Linux is not going to start supporting kqueue, so
132                # skip even trying this configuration.
133                _reactors.extend([
134                        # Support KQueue on non-OS-X POSIX platforms for now.
135                        "twisted.internet.kqreactor.KQueueReactor",
136                        ])
137
138    reactorFactory = None
139    originalHandler = None
140    requiredInterfaces = None
141    skippedReactors = {}
142
143    def setUp(self):
144        """
145        Clear the SIGCHLD handler, if there is one, to ensure an environment
146        like the one which exists prior to a call to L{reactor.run}.
147        """
148        if not platform.isWindows():
149            self.originalHandler = signal.signal(signal.SIGCHLD, signal.SIG_DFL)
150
151
152    def tearDown(self):
153        """
154        Restore the original SIGCHLD handler and reap processes as long as
155        there seem to be any remaining.
156        """
157        if self.originalHandler is not None:
158            signal.signal(signal.SIGCHLD, self.originalHandler)
159        if process is not None:
160            begin = time.time()
161            while process.reapProcessHandlers:
162                log.msg(
163                    "ReactorBuilder.tearDown reaping some processes %r" % (
164                        process.reapProcessHandlers,))
165                process.reapAllProcesses()
166
167                # The process should exit on its own.  However, if it
168                # doesn't, we're stuck in this loop forever.  To avoid
169                # hanging the test suite, eventually give the process some
170                # help exiting and move on.
171                time.sleep(0.001)
172                if time.time() - begin > 60:
173                    for pid in process.reapProcessHandlers:
174                        os.kill(pid, signal.SIGKILL)
175                    raise Exception(
176                        "Timeout waiting for child processes to exit: %r" % (
177                            process.reapProcessHandlers,))
178
179
180    def unbuildReactor(self, reactor):
181        """
182        Clean up any resources which may have been allocated for the given
183        reactor by its creation or by a test which used it.
184        """
185        # Chris says:
186        #
187        # XXX These explicit calls to clean up the waker (and any other
188        # internal readers) should become obsolete when bug #3063 is
189        # fixed. -radix, 2008-02-29. Fortunately it should probably cause an
190        # error when bug #3063 is fixed, so it should be removed in the same
191        # branch that fixes it.
192        #
193        # -exarkun
194        reactor._uninstallHandler()
195        if getattr(reactor, '_internalReaders', None) is not None:
196            for reader in reactor._internalReaders:
197                reactor.removeReader(reader)
198                reader.connectionLost(None)
199            reactor._internalReaders.clear()
200
201        # Here's an extra thing unrelated to wakers but necessary for
202        # cleaning up after the reactors we make.  -exarkun
203        reactor.disconnectAll()
204
205        # It would also be bad if any timed calls left over were allowed to
206        # run.
207        calls = reactor.getDelayedCalls()
208        for c in calls:
209            c.cancel()
210
211
212    def buildReactor(self):
213        """
214        Create and return a reactor using C{self.reactorFactory}.
215        """
216        try:
217            from twisted.internet.cfreactor import CFReactor
218            from twisted.internet import reactor as globalReactor
219        except ImportError:
220            pass
221        else:
222            if (isinstance(globalReactor, CFReactor)
223                and self.reactorFactory is CFReactor):
224                raise SkipTest(
225                    "CFReactor uses APIs which manipulate global state, "
226                    "so it's not safe to run its own reactor-builder tests "
227                    "under itself")
228        try:
229            reactor = self.reactorFactory()
230        except:
231            # Unfortunately, not all errors which result in a reactor
232            # being unusable are detectable without actually
233            # instantiating the reactor.  So we catch some more here
234            # and skip the test if necessary.  We also log it to aid
235            # with debugging, but flush the logged error so the test
236            # doesn't fail.
237            log.err(None, "Failed to install reactor")
238            self.flushLoggedErrors()
239            raise SkipTest(Failure().getErrorMessage())
240        else:
241            if self.requiredInterfaces is not None:
242                missing = [
243                    required for required in self.requiredInterfaces
244                    if not required.providedBy(reactor)]
245                if missing:
246                    self.unbuildReactor(reactor)
247                    raise SkipTest("%s does not provide %s" % (
248                        fullyQualifiedName(reactor.__class__),
249                        ",".join([fullyQualifiedName(x) for x in missing])))
250        self.addCleanup(self.unbuildReactor, reactor)
251        return reactor
252
253
254    def getTimeout(self):
255        """
256        Determine how long to run the test before considering it failed.
257
258        @return: A C{int} or C{float} giving a number of seconds.
259        """
260        return acquireAttribute(self._parents, 'timeout', DEFAULT_TIMEOUT_DURATION)
261
262
263    def runReactor(self, reactor, timeout=None):
264        """
265        Run the reactor for at most the given amount of time.
266
267        @param reactor: The reactor to run.
268
269        @type timeout: C{int} or C{float}
270        @param timeout: The maximum amount of time, specified in seconds, to
271            allow the reactor to run.  If the reactor is still running after
272            this much time has elapsed, it will be stopped and an exception
273            raised.  If C{None}, the default test method timeout imposed by
274            Trial will be used.  This depends on the L{IReactorTime}
275            implementation of C{reactor} for correct operation.
276
277        @raise TestTimeoutError: If the reactor is still running after
278            C{timeout} seconds.
279        """
280        if timeout is None:
281            timeout = self.getTimeout()
282
283        timedOut = []
284        def stop():
285            timedOut.append(None)
286            reactor.stop()
287
288        timedOutCall = reactor.callLater(timeout, stop)
289        reactor.run()
290        if timedOut:
291            raise TestTimeoutError(
292                "reactor still running after %s seconds" % (timeout,))
293        else:
294            timedOutCall.cancel()
295
296
297    def makeTestCaseClasses(cls):
298        """
299        Create a L{SynchronousTestCase} subclass which mixes in C{cls} for each
300        known reactor and return a dict mapping their names to them.
301        """
302        classes = {}
303        for reactor in cls._reactors:
304            shortReactorName = reactor.split(".")[-1]
305            name = (cls.__name__ + "." + shortReactorName).replace(".", "_")
306            class testcase(cls, SynchronousTestCase):
307                __module__ = cls.__module__
308                if reactor in cls.skippedReactors:
309                    skip = cls.skippedReactors[reactor]
310                try:
311                    reactorFactory = namedAny(reactor)
312                except:
313                    skip = Failure().getErrorMessage()
314            testcase.__name__ = name
315            classes[testcase.__name__] = testcase
316        return classes
317    makeTestCaseClasses = classmethod(makeTestCaseClasses)
318