1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for L{twisted.internet.posixbase} and supporting code.
6"""
7
8from __future__ import division, absolute_import
9
10from twisted.python.compat import _PY3
11from twisted.trial.unittest import TestCase
12from twisted.internet.defer import Deferred
13from twisted.internet.posixbase import PosixReactorBase, _Waker
14from twisted.internet.protocol import ServerFactory
15
16skipSockets = None
17if _PY3:
18    skipSockets = "Re-enable when Python 3 port supports AF_UNIX"
19else:
20    try:
21        from twisted.internet import unix
22        from twisted.test.test_unix import ClientProto
23    except ImportError:
24        skipSockets = "Platform does not support AF_UNIX sockets"
25
26from twisted.internet.tcp import Port
27from twisted.internet import reactor
28
29
30
31
32class TrivialReactor(PosixReactorBase):
33    def __init__(self):
34        self._readers = {}
35        self._writers = {}
36        PosixReactorBase.__init__(self)
37
38
39    def addReader(self, reader):
40        self._readers[reader] = True
41
42
43    def removeReader(self, reader):
44        del self._readers[reader]
45
46
47    def addWriter(self, writer):
48        self._writers[writer] = True
49
50
51    def removeWriter(self, writer):
52        del self._writers[writer]
53
54
55
56class PosixReactorBaseTests(TestCase):
57    """
58    Tests for L{PosixReactorBase}.
59    """
60
61    def _checkWaker(self, reactor):
62        self.assertIsInstance(reactor.waker, _Waker)
63        self.assertIn(reactor.waker, reactor._internalReaders)
64        self.assertIn(reactor.waker, reactor._readers)
65
66
67    def test_wakerIsInternalReader(self):
68        """
69        When L{PosixReactorBase} is instantiated, it creates a waker and adds
70        it to its internal readers set.
71        """
72        reactor = TrivialReactor()
73        self._checkWaker(reactor)
74
75
76    def test_removeAllSkipsInternalReaders(self):
77        """
78        Any L{IReadDescriptors} in L{PosixReactorBase._internalReaders} are
79        left alone by L{PosixReactorBase._removeAll}.
80        """
81        reactor = TrivialReactor()
82        extra = object()
83        reactor._internalReaders.add(extra)
84        reactor.addReader(extra)
85        reactor._removeAll(reactor._readers, reactor._writers)
86        self._checkWaker(reactor)
87        self.assertIn(extra, reactor._internalReaders)
88        self.assertIn(extra, reactor._readers)
89
90
91    def test_removeAllReturnsRemovedDescriptors(self):
92        """
93        L{PosixReactorBase._removeAll} returns a list of removed
94        L{IReadDescriptor} and L{IWriteDescriptor} objects.
95        """
96        reactor = TrivialReactor()
97        reader = object()
98        writer = object()
99        reactor.addReader(reader)
100        reactor.addWriter(writer)
101        removed = reactor._removeAll(
102            reactor._readers, reactor._writers)
103        self.assertEqual(set(removed), set([reader, writer]))
104        self.assertNotIn(reader, reactor._readers)
105        self.assertNotIn(writer, reactor._writers)
106
107
108
109class TCPPortTests(TestCase):
110    """
111    Tests for L{twisted.internet.tcp.Port}.
112    """
113
114    if not isinstance(reactor, PosixReactorBase):
115        skip = "Non-posixbase reactor"
116
117    def test_connectionLostFailed(self):
118        """
119        L{Port.stopListening} returns a L{Deferred} which errbacks if
120        L{Port.connectionLost} raises an exception.
121        """
122        port = Port(12345, ServerFactory())
123        port.connected = True
124        port.connectionLost = lambda reason: 1 // 0
125        return self.assertFailure(port.stopListening(), ZeroDivisionError)
126
127
128
129class TimeoutReportReactor(PosixReactorBase):
130    """
131    A reactor which is just barely runnable and which cannot monitor any
132    readers or writers, and which fires a L{Deferred} with the timeout
133    passed to its C{doIteration} method as soon as that method is invoked.
134    """
135    def __init__(self):
136        PosixReactorBase.__init__(self)
137        self.iterationTimeout = Deferred()
138        self.now = 100
139
140
141    def addReader(self, reader):
142        """
143        Ignore the reader.  This is necessary because the waker will be
144        added.  However, we won't actually monitor it for any events.
145        """
146
147
148    def removeAll(self):
149        """
150        There are no readers or writers, so there is nothing to remove.
151        This will be called when the reactor stops, though, so it must be
152        implemented.
153        """
154        return []
155
156
157    def seconds(self):
158        """
159        Override the real clock with a deterministic one that can be easily
160        controlled in a unit test.
161        """
162        return self.now
163
164
165    def doIteration(self, timeout):
166        d = self.iterationTimeout
167        if d is not None:
168            self.iterationTimeout = None
169            d.callback(timeout)
170
171
172
173class IterationTimeoutTests(TestCase):
174    """
175    Tests for the timeout argument L{PosixReactorBase.run} calls
176    L{PosixReactorBase.doIteration} with in the presence of various delayed
177    calls.
178    """
179    def _checkIterationTimeout(self, reactor):
180        timeout = []
181        reactor.iterationTimeout.addCallback(timeout.append)
182        reactor.iterationTimeout.addCallback(lambda ignored: reactor.stop())
183        reactor.run()
184        return timeout[0]
185
186
187    def test_noCalls(self):
188        """
189        If there are no delayed calls, C{doIteration} is called with a
190        timeout of C{None}.
191        """
192        reactor = TimeoutReportReactor()
193        timeout = self._checkIterationTimeout(reactor)
194        self.assertEqual(timeout, None)
195
196
197    def test_delayedCall(self):
198        """
199        If there is a delayed call, C{doIteration} is called with a timeout
200        which is the difference between the current time and the time at
201        which that call is to run.
202        """
203        reactor = TimeoutReportReactor()
204        reactor.callLater(100, lambda: None)
205        timeout = self._checkIterationTimeout(reactor)
206        self.assertEqual(timeout, 100)
207
208
209    def test_timePasses(self):
210        """
211        If a delayed call is scheduled and then some time passes, the
212        timeout passed to C{doIteration} is reduced by the amount of time
213        which passed.
214        """
215        reactor = TimeoutReportReactor()
216        reactor.callLater(100, lambda: None)
217        reactor.now += 25
218        timeout = self._checkIterationTimeout(reactor)
219        self.assertEqual(timeout, 75)
220
221
222    def test_multipleDelayedCalls(self):
223        """
224        If there are several delayed calls, C{doIteration} is called with a
225        timeout which is the difference between the current time and the
226        time at which the earlier of the two calls is to run.
227        """
228        reactor = TimeoutReportReactor()
229        reactor.callLater(50, lambda: None)
230        reactor.callLater(10, lambda: None)
231        reactor.callLater(100, lambda: None)
232        timeout = self._checkIterationTimeout(reactor)
233        self.assertEqual(timeout, 10)
234
235
236    def test_resetDelayedCall(self):
237        """
238        If a delayed call is reset, the timeout passed to C{doIteration} is
239        based on the interval between the time when reset is called and the
240        new delay of the call.
241        """
242        reactor = TimeoutReportReactor()
243        call = reactor.callLater(50, lambda: None)
244        reactor.now += 25
245        call.reset(15)
246        timeout = self._checkIterationTimeout(reactor)
247        self.assertEqual(timeout, 15)
248
249
250    def test_delayDelayedCall(self):
251        """
252        If a delayed call is re-delayed, the timeout passed to
253        C{doIteration} is based on the remaining time before the call would
254        have been made and the additional amount of time passed to the delay
255        method.
256        """
257        reactor = TimeoutReportReactor()
258        call = reactor.callLater(50, lambda: None)
259        reactor.now += 10
260        call.delay(20)
261        timeout = self._checkIterationTimeout(reactor)
262        self.assertEqual(timeout, 60)
263
264
265    def test_cancelDelayedCall(self):
266        """
267        If the only delayed call is canceled, C{None} is the timeout passed
268        to C{doIteration}.
269        """
270        reactor = TimeoutReportReactor()
271        call = reactor.callLater(50, lambda: None)
272        call.cancel()
273        timeout = self._checkIterationTimeout(reactor)
274        self.assertEqual(timeout, None)
275
276
277
278class ConnectedDatagramPortTestCase(TestCase):
279    """
280    Test connected datagram UNIX sockets.
281    """
282    if skipSockets is not None:
283        skip = skipSockets
284
285
286    def test_connectionFailedDoesntCallLoseConnection(self):
287        """
288        L{ConnectedDatagramPort} does not call the deprecated C{loseConnection}
289        in L{ConnectedDatagramPort.connectionFailed}.
290        """
291        def loseConnection():
292            """
293            Dummy C{loseConnection} method. C{loseConnection} is deprecated and
294            should not get called.
295            """
296            self.fail("loseConnection is deprecated and should not get called.")
297
298        port = unix.ConnectedDatagramPort(None, ClientProto())
299        port.loseConnection = loseConnection
300        port.connectionFailed("goodbye")
301
302
303    def test_connectionFailedCallsStopListening(self):
304        """
305        L{ConnectedDatagramPort} calls L{ConnectedDatagramPort.stopListening}
306        instead of the deprecated C{loseConnection} in
307        L{ConnectedDatagramPort.connectionFailed}.
308        """
309        self.called = False
310
311        def stopListening():
312            """
313            Dummy C{stopListening} method.
314            """
315            self.called = True
316
317        port = unix.ConnectedDatagramPort(None, ClientProto())
318        port.stopListening = stopListening
319        port.connectionFailed("goodbye")
320        self.assertEqual(self.called, True)
321