1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Tests for L{twisted.internet.posixbase} and supporting code. 6""" 7 8from __future__ import division, absolute_import 9 10from twisted.python.compat import _PY3 11from twisted.trial.unittest import TestCase 12from twisted.internet.defer import Deferred 13from twisted.internet.posixbase import PosixReactorBase, _Waker 14from twisted.internet.protocol import ServerFactory 15 16skipSockets = None 17if _PY3: 18 skipSockets = "Re-enable when Python 3 port supports AF_UNIX" 19else: 20 try: 21 from twisted.internet import unix 22 from twisted.test.test_unix import ClientProto 23 except ImportError: 24 skipSockets = "Platform does not support AF_UNIX sockets" 25 26from twisted.internet.tcp import Port 27from twisted.internet import reactor 28 29 30 31 32class TrivialReactor(PosixReactorBase): 33 def __init__(self): 34 self._readers = {} 35 self._writers = {} 36 PosixReactorBase.__init__(self) 37 38 39 def addReader(self, reader): 40 self._readers[reader] = True 41 42 43 def removeReader(self, reader): 44 del self._readers[reader] 45 46 47 def addWriter(self, writer): 48 self._writers[writer] = True 49 50 51 def removeWriter(self, writer): 52 del self._writers[writer] 53 54 55 56class PosixReactorBaseTests(TestCase): 57 """ 58 Tests for L{PosixReactorBase}. 59 """ 60 61 def _checkWaker(self, reactor): 62 self.assertIsInstance(reactor.waker, _Waker) 63 self.assertIn(reactor.waker, reactor._internalReaders) 64 self.assertIn(reactor.waker, reactor._readers) 65 66 67 def test_wakerIsInternalReader(self): 68 """ 69 When L{PosixReactorBase} is instantiated, it creates a waker and adds 70 it to its internal readers set. 71 """ 72 reactor = TrivialReactor() 73 self._checkWaker(reactor) 74 75 76 def test_removeAllSkipsInternalReaders(self): 77 """ 78 Any L{IReadDescriptors} in L{PosixReactorBase._internalReaders} are 79 left alone by L{PosixReactorBase._removeAll}. 80 """ 81 reactor = TrivialReactor() 82 extra = object() 83 reactor._internalReaders.add(extra) 84 reactor.addReader(extra) 85 reactor._removeAll(reactor._readers, reactor._writers) 86 self._checkWaker(reactor) 87 self.assertIn(extra, reactor._internalReaders) 88 self.assertIn(extra, reactor._readers) 89 90 91 def test_removeAllReturnsRemovedDescriptors(self): 92 """ 93 L{PosixReactorBase._removeAll} returns a list of removed 94 L{IReadDescriptor} and L{IWriteDescriptor} objects. 95 """ 96 reactor = TrivialReactor() 97 reader = object() 98 writer = object() 99 reactor.addReader(reader) 100 reactor.addWriter(writer) 101 removed = reactor._removeAll( 102 reactor._readers, reactor._writers) 103 self.assertEqual(set(removed), set([reader, writer])) 104 self.assertNotIn(reader, reactor._readers) 105 self.assertNotIn(writer, reactor._writers) 106 107 108 109class TCPPortTests(TestCase): 110 """ 111 Tests for L{twisted.internet.tcp.Port}. 112 """ 113 114 if not isinstance(reactor, PosixReactorBase): 115 skip = "Non-posixbase reactor" 116 117 def test_connectionLostFailed(self): 118 """ 119 L{Port.stopListening} returns a L{Deferred} which errbacks if 120 L{Port.connectionLost} raises an exception. 121 """ 122 port = Port(12345, ServerFactory()) 123 port.connected = True 124 port.connectionLost = lambda reason: 1 // 0 125 return self.assertFailure(port.stopListening(), ZeroDivisionError) 126 127 128 129class TimeoutReportReactor(PosixReactorBase): 130 """ 131 A reactor which is just barely runnable and which cannot monitor any 132 readers or writers, and which fires a L{Deferred} with the timeout 133 passed to its C{doIteration} method as soon as that method is invoked. 134 """ 135 def __init__(self): 136 PosixReactorBase.__init__(self) 137 self.iterationTimeout = Deferred() 138 self.now = 100 139 140 141 def addReader(self, reader): 142 """ 143 Ignore the reader. This is necessary because the waker will be 144 added. However, we won't actually monitor it for any events. 145 """ 146 147 148 def removeAll(self): 149 """ 150 There are no readers or writers, so there is nothing to remove. 151 This will be called when the reactor stops, though, so it must be 152 implemented. 153 """ 154 return [] 155 156 157 def seconds(self): 158 """ 159 Override the real clock with a deterministic one that can be easily 160 controlled in a unit test. 161 """ 162 return self.now 163 164 165 def doIteration(self, timeout): 166 d = self.iterationTimeout 167 if d is not None: 168 self.iterationTimeout = None 169 d.callback(timeout) 170 171 172 173class IterationTimeoutTests(TestCase): 174 """ 175 Tests for the timeout argument L{PosixReactorBase.run} calls 176 L{PosixReactorBase.doIteration} with in the presence of various delayed 177 calls. 178 """ 179 def _checkIterationTimeout(self, reactor): 180 timeout = [] 181 reactor.iterationTimeout.addCallback(timeout.append) 182 reactor.iterationTimeout.addCallback(lambda ignored: reactor.stop()) 183 reactor.run() 184 return timeout[0] 185 186 187 def test_noCalls(self): 188 """ 189 If there are no delayed calls, C{doIteration} is called with a 190 timeout of C{None}. 191 """ 192 reactor = TimeoutReportReactor() 193 timeout = self._checkIterationTimeout(reactor) 194 self.assertEqual(timeout, None) 195 196 197 def test_delayedCall(self): 198 """ 199 If there is a delayed call, C{doIteration} is called with a timeout 200 which is the difference between the current time and the time at 201 which that call is to run. 202 """ 203 reactor = TimeoutReportReactor() 204 reactor.callLater(100, lambda: None) 205 timeout = self._checkIterationTimeout(reactor) 206 self.assertEqual(timeout, 100) 207 208 209 def test_timePasses(self): 210 """ 211 If a delayed call is scheduled and then some time passes, the 212 timeout passed to C{doIteration} is reduced by the amount of time 213 which passed. 214 """ 215 reactor = TimeoutReportReactor() 216 reactor.callLater(100, lambda: None) 217 reactor.now += 25 218 timeout = self._checkIterationTimeout(reactor) 219 self.assertEqual(timeout, 75) 220 221 222 def test_multipleDelayedCalls(self): 223 """ 224 If there are several delayed calls, C{doIteration} is called with a 225 timeout which is the difference between the current time and the 226 time at which the earlier of the two calls is to run. 227 """ 228 reactor = TimeoutReportReactor() 229 reactor.callLater(50, lambda: None) 230 reactor.callLater(10, lambda: None) 231 reactor.callLater(100, lambda: None) 232 timeout = self._checkIterationTimeout(reactor) 233 self.assertEqual(timeout, 10) 234 235 236 def test_resetDelayedCall(self): 237 """ 238 If a delayed call is reset, the timeout passed to C{doIteration} is 239 based on the interval between the time when reset is called and the 240 new delay of the call. 241 """ 242 reactor = TimeoutReportReactor() 243 call = reactor.callLater(50, lambda: None) 244 reactor.now += 25 245 call.reset(15) 246 timeout = self._checkIterationTimeout(reactor) 247 self.assertEqual(timeout, 15) 248 249 250 def test_delayDelayedCall(self): 251 """ 252 If a delayed call is re-delayed, the timeout passed to 253 C{doIteration} is based on the remaining time before the call would 254 have been made and the additional amount of time passed to the delay 255 method. 256 """ 257 reactor = TimeoutReportReactor() 258 call = reactor.callLater(50, lambda: None) 259 reactor.now += 10 260 call.delay(20) 261 timeout = self._checkIterationTimeout(reactor) 262 self.assertEqual(timeout, 60) 263 264 265 def test_cancelDelayedCall(self): 266 """ 267 If the only delayed call is canceled, C{None} is the timeout passed 268 to C{doIteration}. 269 """ 270 reactor = TimeoutReportReactor() 271 call = reactor.callLater(50, lambda: None) 272 call.cancel() 273 timeout = self._checkIterationTimeout(reactor) 274 self.assertEqual(timeout, None) 275 276 277 278class ConnectedDatagramPortTestCase(TestCase): 279 """ 280 Test connected datagram UNIX sockets. 281 """ 282 if skipSockets is not None: 283 skip = skipSockets 284 285 286 def test_connectionFailedDoesntCallLoseConnection(self): 287 """ 288 L{ConnectedDatagramPort} does not call the deprecated C{loseConnection} 289 in L{ConnectedDatagramPort.connectionFailed}. 290 """ 291 def loseConnection(): 292 """ 293 Dummy C{loseConnection} method. C{loseConnection} is deprecated and 294 should not get called. 295 """ 296 self.fail("loseConnection is deprecated and should not get called.") 297 298 port = unix.ConnectedDatagramPort(None, ClientProto()) 299 port.loseConnection = loseConnection 300 port.connectionFailed("goodbye") 301 302 303 def test_connectionFailedCallsStopListening(self): 304 """ 305 L{ConnectedDatagramPort} calls L{ConnectedDatagramPort.stopListening} 306 instead of the deprecated C{loseConnection} in 307 L{ConnectedDatagramPort.connectionFailed}. 308 """ 309 self.called = False 310 311 def stopListening(): 312 """ 313 Dummy C{stopListening} method. 314 """ 315 self.called = True 316 317 port = unix.ConnectedDatagramPort(None, ClientProto()) 318 port.stopListening = stopListening 319 port.connectionFailed("goodbye") 320 self.assertEqual(self.called, True) 321