1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Utilities for unit testing reactor implementations. 6 7The main feature of this module is L{ReactorBuilder}, a base class for use when 8writing interface/blackbox tests for reactor implementations. Test case classes 9for reactor features should subclass L{ReactorBuilder} instead of 10L{SynchronousTestCase}. All of the features of L{SynchronousTestCase} will be 11available. Additionally, the tests will automatically be applied to all 12available reactor implementations. 13""" 14 15from __future__ import division, absolute_import 16 17__metaclass__ = type 18 19__all__ = ['TestTimeoutError', 'ReactorBuilder', 'needsRunningReactor'] 20 21import os, signal, time 22 23from twisted.python.compat import _PY3 24from twisted.trial.unittest import SynchronousTestCase, SkipTest 25from twisted.trial.util import DEFAULT_TIMEOUT_DURATION, acquireAttribute 26from twisted.python.runtime import platform 27from twisted.python.reflect import namedAny 28from twisted.python.deprecate import _fullyQualifiedName as fullyQualifiedName 29 30from twisted.python import log 31from twisted.python.failure import Failure 32 33 34# Access private APIs. 35if platform.isWindows(): 36 process = None 37elif _PY3: 38 # Enable this on Python 3 when twisted.internet.process is ported. 39 # See #5968. 40 process = None 41else: 42 from twisted.internet import process 43 44 45 46class TestTimeoutError(Exception): 47 """ 48 The reactor was still running after the timeout period elapsed in 49 L{ReactorBuilder.runReactor}. 50 """ 51 52 53 54def needsRunningReactor(reactor, thunk): 55 """ 56 Various functions within these tests need an already-running reactor at 57 some point. They need to stop the reactor when the test has completed, and 58 that means calling reactor.stop(). However, reactor.stop() raises an 59 exception if the reactor isn't already running, so if the L{Deferred} that 60 a particular API under test returns fires synchronously (as especially an 61 endpoint's C{connect()} method may do, if the connect is to a local 62 interface address) then the test won't be able to stop the reactor being 63 tested and finish. So this calls C{thunk} only once C{reactor} is running. 64 65 (This is just an alias for 66 L{twisted.internet.interfaces.IReactorCore.callWhenRunning} on the given 67 reactor parameter, in order to centrally reference the above paragraph and 68 repeating it everywhere as a comment.) 69 70 @param reactor: the L{twisted.internet.interfaces.IReactorCore} under test 71 72 @param thunk: a 0-argument callable, which eventually finishes the test in 73 question, probably in a L{Deferred} callback. 74 """ 75 reactor.callWhenRunning(thunk) 76 77 78 79class ReactorBuilder: 80 """ 81 L{SynchronousTestCase} mixin which provides a reactor-creation API. This 82 mixin defines C{setUp} and C{tearDown}, so mix it in before 83 L{SynchronousTestCase} or call its methods from the overridden ones in the 84 subclass. 85 86 @cvar skippedReactors: A dict mapping FQPN strings of reactors for 87 which the tests defined by this class will be skipped to strings 88 giving the skip message. 89 @cvar requiredInterfaces: A C{list} of interfaces which the reactor must 90 provide or these tests will be skipped. The default, C{None}, means 91 that no interfaces are required. 92 @ivar reactorFactory: A no-argument callable which returns the reactor to 93 use for testing. 94 @ivar originalHandler: The SIGCHLD handler which was installed when setUp 95 ran and which will be re-installed when tearDown runs. 96 @ivar _reactors: A list of FQPN strings giving the reactors for which 97 L{SynchronousTestCase}s will be created. 98 """ 99 100 _reactors = [ 101 # Select works everywhere 102 "twisted.internet.selectreactor.SelectReactor", 103 ] 104 105 if platform.isWindows(): 106 # PortableGtkReactor is only really interesting on Windows, 107 # but not really Windows specific; if you want you can 108 # temporarily move this up to the all-platforms list to test 109 # it on other platforms. It's not there in general because 110 # it's not _really_ worth it to support on other platforms, 111 # since no one really wants to use it on other platforms. 112 _reactors.extend([ 113 "twisted.internet.gtk2reactor.PortableGtkReactor", 114 "twisted.internet.gireactor.PortableGIReactor", 115 "twisted.internet.gtk3reactor.PortableGtk3Reactor", 116 "twisted.internet.win32eventreactor.Win32Reactor", 117 "twisted.internet.iocpreactor.reactor.IOCPReactor"]) 118 else: 119 _reactors.extend([ 120 "twisted.internet.glib2reactor.Glib2Reactor", 121 "twisted.internet.gtk2reactor.Gtk2Reactor", 122 "twisted.internet.gireactor.GIReactor", 123 "twisted.internet.gtk3reactor.Gtk3Reactor"]) 124 if platform.isMacOSX(): 125 _reactors.append("twisted.internet.cfreactor.CFReactor") 126 else: 127 _reactors.extend([ 128 "twisted.internet.pollreactor.PollReactor", 129 "twisted.internet.epollreactor.EPollReactor"]) 130 if not platform.isLinux(): 131 # Presumably Linux is not going to start supporting kqueue, so 132 # skip even trying this configuration. 133 _reactors.extend([ 134 # Support KQueue on non-OS-X POSIX platforms for now. 135 "twisted.internet.kqreactor.KQueueReactor", 136 ]) 137 138 reactorFactory = None 139 originalHandler = None 140 requiredInterfaces = None 141 skippedReactors = {} 142 143 def setUp(self): 144 """ 145 Clear the SIGCHLD handler, if there is one, to ensure an environment 146 like the one which exists prior to a call to L{reactor.run}. 147 """ 148 if not platform.isWindows(): 149 self.originalHandler = signal.signal(signal.SIGCHLD, signal.SIG_DFL) 150 151 152 def tearDown(self): 153 """ 154 Restore the original SIGCHLD handler and reap processes as long as 155 there seem to be any remaining. 156 """ 157 if self.originalHandler is not None: 158 signal.signal(signal.SIGCHLD, self.originalHandler) 159 if process is not None: 160 begin = time.time() 161 while process.reapProcessHandlers: 162 log.msg( 163 "ReactorBuilder.tearDown reaping some processes %r" % ( 164 process.reapProcessHandlers,)) 165 process.reapAllProcesses() 166 167 # The process should exit on its own. However, if it 168 # doesn't, we're stuck in this loop forever. To avoid 169 # hanging the test suite, eventually give the process some 170 # help exiting and move on. 171 time.sleep(0.001) 172 if time.time() - begin > 60: 173 for pid in process.reapProcessHandlers: 174 os.kill(pid, signal.SIGKILL) 175 raise Exception( 176 "Timeout waiting for child processes to exit: %r" % ( 177 process.reapProcessHandlers,)) 178 179 180 def unbuildReactor(self, reactor): 181 """ 182 Clean up any resources which may have been allocated for the given 183 reactor by its creation or by a test which used it. 184 """ 185 # Chris says: 186 # 187 # XXX These explicit calls to clean up the waker (and any other 188 # internal readers) should become obsolete when bug #3063 is 189 # fixed. -radix, 2008-02-29. Fortunately it should probably cause an 190 # error when bug #3063 is fixed, so it should be removed in the same 191 # branch that fixes it. 192 # 193 # -exarkun 194 reactor._uninstallHandler() 195 if getattr(reactor, '_internalReaders', None) is not None: 196 for reader in reactor._internalReaders: 197 reactor.removeReader(reader) 198 reader.connectionLost(None) 199 reactor._internalReaders.clear() 200 201 # Here's an extra thing unrelated to wakers but necessary for 202 # cleaning up after the reactors we make. -exarkun 203 reactor.disconnectAll() 204 205 # It would also be bad if any timed calls left over were allowed to 206 # run. 207 calls = reactor.getDelayedCalls() 208 for c in calls: 209 c.cancel() 210 211 212 def buildReactor(self): 213 """ 214 Create and return a reactor using C{self.reactorFactory}. 215 """ 216 try: 217 from twisted.internet.cfreactor import CFReactor 218 from twisted.internet import reactor as globalReactor 219 except ImportError: 220 pass 221 else: 222 if (isinstance(globalReactor, CFReactor) 223 and self.reactorFactory is CFReactor): 224 raise SkipTest( 225 "CFReactor uses APIs which manipulate global state, " 226 "so it's not safe to run its own reactor-builder tests " 227 "under itself") 228 try: 229 reactor = self.reactorFactory() 230 except: 231 # Unfortunately, not all errors which result in a reactor 232 # being unusable are detectable without actually 233 # instantiating the reactor. So we catch some more here 234 # and skip the test if necessary. We also log it to aid 235 # with debugging, but flush the logged error so the test 236 # doesn't fail. 237 log.err(None, "Failed to install reactor") 238 self.flushLoggedErrors() 239 raise SkipTest(Failure().getErrorMessage()) 240 else: 241 if self.requiredInterfaces is not None: 242 missing = [ 243 required for required in self.requiredInterfaces 244 if not required.providedBy(reactor)] 245 if missing: 246 self.unbuildReactor(reactor) 247 raise SkipTest("%s does not provide %s" % ( 248 fullyQualifiedName(reactor.__class__), 249 ",".join([fullyQualifiedName(x) for x in missing]))) 250 self.addCleanup(self.unbuildReactor, reactor) 251 return reactor 252 253 254 def getTimeout(self): 255 """ 256 Determine how long to run the test before considering it failed. 257 258 @return: A C{int} or C{float} giving a number of seconds. 259 """ 260 return acquireAttribute(self._parents, 'timeout', DEFAULT_TIMEOUT_DURATION) 261 262 263 def runReactor(self, reactor, timeout=None): 264 """ 265 Run the reactor for at most the given amount of time. 266 267 @param reactor: The reactor to run. 268 269 @type timeout: C{int} or C{float} 270 @param timeout: The maximum amount of time, specified in seconds, to 271 allow the reactor to run. If the reactor is still running after 272 this much time has elapsed, it will be stopped and an exception 273 raised. If C{None}, the default test method timeout imposed by 274 Trial will be used. This depends on the L{IReactorTime} 275 implementation of C{reactor} for correct operation. 276 277 @raise TestTimeoutError: If the reactor is still running after 278 C{timeout} seconds. 279 """ 280 if timeout is None: 281 timeout = self.getTimeout() 282 283 timedOut = [] 284 def stop(): 285 timedOut.append(None) 286 reactor.stop() 287 288 timedOutCall = reactor.callLater(timeout, stop) 289 reactor.run() 290 if timedOut: 291 raise TestTimeoutError( 292 "reactor still running after %s seconds" % (timeout,)) 293 else: 294 timedOutCall.cancel() 295 296 297 def makeTestCaseClasses(cls): 298 """ 299 Create a L{SynchronousTestCase} subclass which mixes in C{cls} for each 300 known reactor and return a dict mapping their names to them. 301 """ 302 classes = {} 303 for reactor in cls._reactors: 304 shortReactorName = reactor.split(".")[-1] 305 name = (cls.__name__ + "." + shortReactorName).replace(".", "_") 306 class testcase(cls, SynchronousTestCase): 307 __module__ = cls.__module__ 308 if reactor in cls.skippedReactors: 309 skip = cls.skippedReactors[reactor] 310 try: 311 reactorFactory = namedAny(reactor) 312 except: 313 skip = Failure().getErrorMessage() 314 testcase.__name__ = name 315 classes[testcase.__name__] = testcase 316 return classes 317 makeTestCaseClasses = classmethod(makeTestCaseClasses) 318