1"""Unit tests for socket timeout feature."""
2
3import functools
4import unittest
5from test import support
6from test.support import socket_helper
7
8# This requires the 'network' resource as given on the regrtest command line.
9skip_expected = not support.is_resource_enabled('network')
10
11import time
12import errno
13import socket
14
15
16@functools.lru_cache()
17def resolve_address(host, port):
18    """Resolve an (host, port) to an address.
19
20    We must perform name resolution before timeout tests, otherwise it will be
21    performed by connect().
22    """
23    with socket_helper.transient_internet(host):
24        return socket.getaddrinfo(host, port, socket.AF_INET,
25                                  socket.SOCK_STREAM)[0][4]
26
27
28class CreationTestCase(unittest.TestCase):
29    """Test case for socket.gettimeout() and socket.settimeout()"""
30
31    def setUp(self):
32        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
33
34    def tearDown(self):
35        self.sock.close()
36
37    def testObjectCreation(self):
38        # Test Socket creation
39        self.assertEqual(self.sock.gettimeout(), None,
40                         "timeout not disabled by default")
41
42    def testFloatReturnValue(self):
43        # Test return value of gettimeout()
44        self.sock.settimeout(7.345)
45        self.assertEqual(self.sock.gettimeout(), 7.345)
46
47        self.sock.settimeout(3)
48        self.assertEqual(self.sock.gettimeout(), 3)
49
50        self.sock.settimeout(None)
51        self.assertEqual(self.sock.gettimeout(), None)
52
53    def testReturnType(self):
54        # Test return type of gettimeout()
55        self.sock.settimeout(1)
56        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
57
58        self.sock.settimeout(3.9)
59        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
60
61    def testTypeCheck(self):
62        # Test type checking by settimeout()
63        self.sock.settimeout(0)
64        self.sock.settimeout(0)
65        self.sock.settimeout(0.0)
66        self.sock.settimeout(None)
67        self.assertRaises(TypeError, self.sock.settimeout, "")
68        self.assertRaises(TypeError, self.sock.settimeout, "")
69        self.assertRaises(TypeError, self.sock.settimeout, ())
70        self.assertRaises(TypeError, self.sock.settimeout, [])
71        self.assertRaises(TypeError, self.sock.settimeout, {})
72        self.assertRaises(TypeError, self.sock.settimeout, 0j)
73
74    def testRangeCheck(self):
75        # Test range checking by settimeout()
76        self.assertRaises(ValueError, self.sock.settimeout, -1)
77        self.assertRaises(ValueError, self.sock.settimeout, -1)
78        self.assertRaises(ValueError, self.sock.settimeout, -1.0)
79
80    def testTimeoutThenBlocking(self):
81        # Test settimeout() followed by setblocking()
82        self.sock.settimeout(10)
83        self.sock.setblocking(True)
84        self.assertEqual(self.sock.gettimeout(), None)
85        self.sock.setblocking(False)
86        self.assertEqual(self.sock.gettimeout(), 0.0)
87
88        self.sock.settimeout(10)
89        self.sock.setblocking(False)
90        self.assertEqual(self.sock.gettimeout(), 0.0)
91        self.sock.setblocking(True)
92        self.assertEqual(self.sock.gettimeout(), None)
93
94    def testBlockingThenTimeout(self):
95        # Test setblocking() followed by settimeout()
96        self.sock.setblocking(False)
97        self.sock.settimeout(1)
98        self.assertEqual(self.sock.gettimeout(), 1)
99
100        self.sock.setblocking(True)
101        self.sock.settimeout(1)
102        self.assertEqual(self.sock.gettimeout(), 1)
103
104
105class TimeoutTestCase(unittest.TestCase):
106    # There are a number of tests here trying to make sure that an operation
107    # doesn't take too much longer than expected.  But competing machine
108    # activity makes it inevitable that such tests will fail at times.
109    # When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K
110    # and Win98SE.  Boosting it to 2.0 helped a lot, but isn't a real
111    # solution.
112    fuzz = 2.0
113
114    localhost = socket_helper.HOST
115
116    def setUp(self):
117        raise NotImplementedError()
118
119    tearDown = setUp
120
121    def _sock_operation(self, count, timeout, method, *args):
122        """
123        Test the specified socket method.
124
125        The method is run at most `count` times and must raise a socket.timeout
126        within `timeout` + self.fuzz seconds.
127        """
128        self.sock.settimeout(timeout)
129        method = getattr(self.sock, method)
130        for i in range(count):
131            t1 = time.monotonic()
132            try:
133                method(*args)
134            except socket.timeout as e:
135                delta = time.monotonic() - t1
136                break
137        else:
138            self.fail('socket.timeout was not raised')
139        # These checks should account for timing unprecision
140        self.assertLess(delta, timeout + self.fuzz)
141        self.assertGreater(delta, timeout - 1.0)
142
143
144class TCPTimeoutTestCase(TimeoutTestCase):
145    """TCP test case for socket.socket() timeout functions"""
146
147    def setUp(self):
148        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149        self.addr_remote = resolve_address('www.python.org.', 80)
150
151    def tearDown(self):
152        self.sock.close()
153
154    @unittest.skipIf(True, 'need to replace these hosts; see bpo-35518')
155    def testConnectTimeout(self):
156        # Testing connect timeout is tricky: we need to have IP connectivity
157        # to a host that silently drops our packets.  We can't simulate this
158        # from Python because it's a function of the underlying TCP/IP stack.
159        # So, the following Snakebite host has been defined:
160        blackhole = resolve_address('blackhole.snakebite.net', 56666)
161
162        # Blackhole has been configured to silently drop any incoming packets.
163        # No RSTs (for TCP) or ICMP UNREACH (for UDP/ICMP) will be sent back
164        # to hosts that attempt to connect to this address: which is exactly
165        # what we need to confidently test connect timeout.
166
167        # However, we want to prevent false positives.  It's not unreasonable
168        # to expect certain hosts may not be able to reach the blackhole, due
169        # to firewalling or general network configuration.  In order to improve
170        # our confidence in testing the blackhole, a corresponding 'whitehole'
171        # has also been set up using one port higher:
172        whitehole = resolve_address('whitehole.snakebite.net', 56667)
173
174        # This address has been configured to immediately drop any incoming
175        # packets as well, but it does it respectfully with regards to the
176        # incoming protocol.  RSTs are sent for TCP packets, and ICMP UNREACH
177        # is sent for UDP/ICMP packets.  This means our attempts to connect to
178        # it should be met immediately with ECONNREFUSED.  The test case has
179        # been structured around this premise: if we get an ECONNREFUSED from
180        # the whitehole, we proceed with testing connect timeout against the
181        # blackhole.  If we don't, we skip the test (with a message about not
182        # getting the required RST from the whitehole within the required
183        # timeframe).
184
185        # For the records, the whitehole/blackhole configuration has been set
186        # up using the 'pf' firewall (available on BSDs), using the following:
187        #
188        #   ext_if="bge0"
189        #
190        #   blackhole_ip="35.8.247.6"
191        #   whitehole_ip="35.8.247.6"
192        #   blackhole_port="56666"
193        #   whitehole_port="56667"
194        #
195        #   block return in log quick on $ext_if proto { tcp udp } \
196        #       from any to $whitehole_ip port $whitehole_port
197        #   block drop in log quick on $ext_if proto { tcp udp } \
198        #       from any to $blackhole_ip port $blackhole_port
199        #
200
201        skip = True
202        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
203        timeout = support.LOOPBACK_TIMEOUT
204        sock.settimeout(timeout)
205        try:
206            sock.connect((whitehole))
207        except socket.timeout:
208            pass
209        except OSError as err:
210            if err.errno == errno.ECONNREFUSED:
211                skip = False
212        finally:
213            sock.close()
214            del sock
215
216        if skip:
217            self.skipTest(
218                "We didn't receive a connection reset (RST) packet from "
219                "{}:{} within {} seconds, so we're unable to test connect "
220                "timeout against the corresponding {}:{} (which is "
221                "configured to silently drop packets)."
222                    .format(
223                        whitehole[0],
224                        whitehole[1],
225                        timeout,
226                        blackhole[0],
227                        blackhole[1],
228                    )
229            )
230
231        # All that hard work just to test if connect times out in 0.001s ;-)
232        self.addr_remote = blackhole
233        with socket_helper.transient_internet(self.addr_remote[0]):
234            self._sock_operation(1, 0.001, 'connect', self.addr_remote)
235
236    def testRecvTimeout(self):
237        # Test recv() timeout
238        with socket_helper.transient_internet(self.addr_remote[0]):
239            self.sock.connect(self.addr_remote)
240            self._sock_operation(1, 1.5, 'recv', 1024)
241
242    def testAcceptTimeout(self):
243        # Test accept() timeout
244        socket_helper.bind_port(self.sock, self.localhost)
245        self.sock.listen()
246        self._sock_operation(1, 1.5, 'accept')
247
248    def testSend(self):
249        # Test send() timeout
250        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
251            socket_helper.bind_port(serv, self.localhost)
252            serv.listen()
253            self.sock.connect(serv.getsockname())
254            # Send a lot of data in order to bypass buffering in the TCP stack.
255            self._sock_operation(100, 1.5, 'send', b"X" * 200000)
256
257    def testSendto(self):
258        # Test sendto() timeout
259        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
260            socket_helper.bind_port(serv, self.localhost)
261            serv.listen()
262            self.sock.connect(serv.getsockname())
263            # The address argument is ignored since we already connected.
264            self._sock_operation(100, 1.5, 'sendto', b"X" * 200000,
265                                 serv.getsockname())
266
267    def testSendall(self):
268        # Test sendall() timeout
269        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
270            socket_helper.bind_port(serv, self.localhost)
271            serv.listen()
272            self.sock.connect(serv.getsockname())
273            # Send a lot of data in order to bypass buffering in the TCP stack.
274            self._sock_operation(100, 1.5, 'sendall', b"X" * 200000)
275
276
277class UDPTimeoutTestCase(TimeoutTestCase):
278    """UDP test case for socket.socket() timeout functions"""
279
280    def setUp(self):
281        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
282
283    def tearDown(self):
284        self.sock.close()
285
286    def testRecvfromTimeout(self):
287        # Test recvfrom() timeout
288        # Prevent "Address already in use" socket exceptions
289        socket_helper.bind_port(self.sock, self.localhost)
290        self._sock_operation(1, 1.5, 'recvfrom', 1024)
291
292
293def setUpModule():
294    support.requires('network')
295
296
297if __name__ == "__main__":
298    unittest.main()
299