1"""Unit tests for socket timeout feature."""
2
3import functools
4import unittest
5from test import support
6
7# This requires the 'network' resource as given on the regrtest command line.
8skip_expected = not support.is_resource_enabled('network')
9
10import time
11import errno
12import socket
13
14
15@functools.lru_cache()
16def resolve_address(host, port):
17    """Resolve an (host, port) to an address.
18
19    We must perform name resolution before timeout tests, otherwise it will be
20    performed by connect().
21    """
22    with support.transient_internet(host):
23        return socket.getaddrinfo(host, port, socket.AF_INET,
24                                  socket.SOCK_STREAM)[0][4]
25
26
27class CreationTestCase(unittest.TestCase):
28    """Test case for socket.gettimeout() and socket.settimeout()"""
29
30    def setUp(self):
31        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
32
33    def tearDown(self):
34        self.sock.close()
35
36    def testObjectCreation(self):
37        # Test Socket creation
38        self.assertEqual(self.sock.gettimeout(), None,
39                         "timeout not disabled by default")
40
41    def testFloatReturnValue(self):
42        # Test return value of gettimeout()
43        self.sock.settimeout(7.345)
44        self.assertEqual(self.sock.gettimeout(), 7.345)
45
46        self.sock.settimeout(3)
47        self.assertEqual(self.sock.gettimeout(), 3)
48
49        self.sock.settimeout(None)
50        self.assertEqual(self.sock.gettimeout(), None)
51
52    def testReturnType(self):
53        # Test return type of gettimeout()
54        self.sock.settimeout(1)
55        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
56
57        self.sock.settimeout(3.9)
58        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
59
60    def testTypeCheck(self):
61        # Test type checking by settimeout()
62        self.sock.settimeout(0)
63        self.sock.settimeout(0)
64        self.sock.settimeout(0.0)
65        self.sock.settimeout(None)
66        self.assertRaises(TypeError, self.sock.settimeout, "")
67        self.assertRaises(TypeError, self.sock.settimeout, "")
68        self.assertRaises(TypeError, self.sock.settimeout, ())
69        self.assertRaises(TypeError, self.sock.settimeout, [])
70        self.assertRaises(TypeError, self.sock.settimeout, {})
71        self.assertRaises(TypeError, self.sock.settimeout, 0j)
72
73    def testRangeCheck(self):
74        # Test range checking by settimeout()
75        self.assertRaises(ValueError, self.sock.settimeout, -1)
76        self.assertRaises(ValueError, self.sock.settimeout, -1)
77        self.assertRaises(ValueError, self.sock.settimeout, -1.0)
78
79    def testTimeoutThenBlocking(self):
80        # Test settimeout() followed by setblocking()
81        self.sock.settimeout(10)
82        self.sock.setblocking(1)
83        self.assertEqual(self.sock.gettimeout(), None)
84        self.sock.setblocking(0)
85        self.assertEqual(self.sock.gettimeout(), 0.0)
86
87        self.sock.settimeout(10)
88        self.sock.setblocking(0)
89        self.assertEqual(self.sock.gettimeout(), 0.0)
90        self.sock.setblocking(1)
91        self.assertEqual(self.sock.gettimeout(), None)
92
93    def testBlockingThenTimeout(self):
94        # Test setblocking() followed by settimeout()
95        self.sock.setblocking(0)
96        self.sock.settimeout(1)
97        self.assertEqual(self.sock.gettimeout(), 1)
98
99        self.sock.setblocking(1)
100        self.sock.settimeout(1)
101        self.assertEqual(self.sock.gettimeout(), 1)
102
103
104class TimeoutTestCase(unittest.TestCase):
105    # There are a number of tests here trying to make sure that an operation
106    # doesn't take too much longer than expected.  But competing machine
107    # activity makes it inevitable that such tests will fail at times.
108    # When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K
109    # and Win98SE.  Boosting it to 2.0 helped a lot, but isn't a real
110    # solution.
111    fuzz = 2.0
112
113    localhost = support.HOST
114
115    def setUp(self):
116        raise NotImplementedError()
117
118    tearDown = setUp
119
120    def _sock_operation(self, count, timeout, method, *args):
121        """
122        Test the specified socket method.
123
124        The method is run at most `count` times and must raise a socket.timeout
125        within `timeout` + self.fuzz seconds.
126        """
127        self.sock.settimeout(timeout)
128        method = getattr(self.sock, method)
129        for i in range(count):
130            t1 = time.monotonic()
131            try:
132                method(*args)
133            except socket.timeout as e:
134                delta = time.monotonic() - t1
135                break
136        else:
137            self.fail('socket.timeout was not raised')
138        # These checks should account for timing unprecision
139        self.assertLess(delta, timeout + self.fuzz)
140        self.assertGreater(delta, timeout - 1.0)
141
142
143class TCPTimeoutTestCase(TimeoutTestCase):
144    """TCP test case for socket.socket() timeout functions"""
145
146    def setUp(self):
147        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
148        self.addr_remote = resolve_address('www.python.org.', 80)
149
150    def tearDown(self):
151        self.sock.close()
152
153    @unittest.skipIf(True, 'need to replace these hosts; see bpo-35518')
154    def testConnectTimeout(self):
155        # Testing connect timeout is tricky: we need to have IP connectivity
156        # to a host that silently drops our packets.  We can't simulate this
157        # from Python because it's a function of the underlying TCP/IP stack.
158        # So, the following Snakebite host has been defined:
159        blackhole = resolve_address('blackhole.snakebite.net', 56666)
160
161        # Blackhole has been configured to silently drop any incoming packets.
162        # No RSTs (for TCP) or ICMP UNREACH (for UDP/ICMP) will be sent back
163        # to hosts that attempt to connect to this address: which is exactly
164        # what we need to confidently test connect timeout.
165
166        # However, we want to prevent false positives.  It's not unreasonable
167        # to expect certain hosts may not be able to reach the blackhole, due
168        # to firewalling or general network configuration.  In order to improve
169        # our confidence in testing the blackhole, a corresponding 'whitehole'
170        # has also been set up using one port higher:
171        whitehole = resolve_address('whitehole.snakebite.net', 56667)
172
173        # This address has been configured to immediately drop any incoming
174        # packets as well, but it does it respectfully with regards to the
175        # incoming protocol.  RSTs are sent for TCP packets, and ICMP UNREACH
176        # is sent for UDP/ICMP packets.  This means our attempts to connect to
177        # it should be met immediately with ECONNREFUSED.  The test case has
178        # been structured around this premise: if we get an ECONNREFUSED from
179        # the whitehole, we proceed with testing connect timeout against the
180        # blackhole.  If we don't, we skip the test (with a message about not
181        # getting the required RST from the whitehole within the required
182        # timeframe).
183
184        # For the records, the whitehole/blackhole configuration has been set
185        # up using the 'pf' firewall (available on BSDs), using the following:
186        #
187        #   ext_if="bge0"
188        #
189        #   blackhole_ip="35.8.247.6"
190        #   whitehole_ip="35.8.247.6"
191        #   blackhole_port="56666"
192        #   whitehole_port="56667"
193        #
194        #   block return in log quick on $ext_if proto { tcp udp } \
195        #       from any to $whitehole_ip port $whitehole_port
196        #   block drop in log quick on $ext_if proto { tcp udp } \
197        #       from any to $blackhole_ip port $blackhole_port
198        #
199
200        skip = True
201        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
202        # Use a timeout of 3 seconds.  Why 3?  Because it's more than 1, and
203        # less than 5.  i.e. no particular reason.  Feel free to tweak it if
204        # you feel a different value would be more appropriate.
205        timeout = 3
206        sock.settimeout(timeout)
207        try:
208            sock.connect((whitehole))
209        except socket.timeout:
210            pass
211        except OSError as err:
212            if err.errno == errno.ECONNREFUSED:
213                skip = False
214        finally:
215            sock.close()
216            del sock
217
218        if skip:
219            self.skipTest(
220                "We didn't receive a connection reset (RST) packet from "
221                "{}:{} within {} seconds, so we're unable to test connect "
222                "timeout against the corresponding {}:{} (which is "
223                "configured to silently drop packets)."
224                    .format(
225                        whitehole[0],
226                        whitehole[1],
227                        timeout,
228                        blackhole[0],
229                        blackhole[1],
230                    )
231            )
232
233        # All that hard work just to test if connect times out in 0.001s ;-)
234        self.addr_remote = blackhole
235        with support.transient_internet(self.addr_remote[0]):
236            self._sock_operation(1, 0.001, 'connect', self.addr_remote)
237
238    def testRecvTimeout(self):
239        # Test recv() timeout
240        with support.transient_internet(self.addr_remote[0]):
241            self.sock.connect(self.addr_remote)
242            self._sock_operation(1, 1.5, 'recv', 1024)
243
244    def testAcceptTimeout(self):
245        # Test accept() timeout
246        support.bind_port(self.sock, self.localhost)
247        self.sock.listen()
248        self._sock_operation(1, 1.5, 'accept')
249
250    def testSend(self):
251        # Test send() timeout
252        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
253            support.bind_port(serv, self.localhost)
254            serv.listen()
255            self.sock.connect(serv.getsockname())
256            # Send a lot of data in order to bypass buffering in the TCP stack.
257            self._sock_operation(100, 1.5, 'send', b"X" * 200000)
258
259    def testSendto(self):
260        # Test sendto() timeout
261        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
262            support.bind_port(serv, self.localhost)
263            serv.listen()
264            self.sock.connect(serv.getsockname())
265            # The address argument is ignored since we already connected.
266            self._sock_operation(100, 1.5, 'sendto', b"X" * 200000,
267                                 serv.getsockname())
268
269    def testSendall(self):
270        # Test sendall() timeout
271        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
272            support.bind_port(serv, self.localhost)
273            serv.listen()
274            self.sock.connect(serv.getsockname())
275            # Send a lot of data in order to bypass buffering in the TCP stack.
276            self._sock_operation(100, 1.5, 'sendall', b"X" * 200000)
277
278
279class UDPTimeoutTestCase(TimeoutTestCase):
280    """UDP test case for socket.socket() timeout functions"""
281
282    def setUp(self):
283        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
284
285    def tearDown(self):
286        self.sock.close()
287
288    def testRecvfromTimeout(self):
289        # Test recvfrom() timeout
290        # Prevent "Address already in use" socket exceptions
291        support.bind_port(self.sock, self.localhost)
292        self._sock_operation(1, 1.5, 'recvfrom', 1024)
293
294
295def test_main():
296    support.requires('network')
297    support.run_unittest(
298        CreationTestCase,
299        TCPTimeoutTestCase,
300        UDPTimeoutTestCase,
301    )
302
303if __name__ == "__main__":
304    test_main()
305