1import unittest
2from test import test_support
3
4import errno
5import itertools
6import socket
7import select
8import time
9import traceback
10import Queue
11import sys
12import os
13import array
14import contextlib
15import signal
16import math
17import weakref
18try:
19    import _socket
20except ImportError:
21    _socket = None
22
23
24MAIN_TIMEOUT = 60.0
25
26
27def try_address(host, port=0, family=socket.AF_INET):
28    """Try to bind a socket on the given host:port and return True
29    if that has been possible."""
30    try:
31        sock = socket.socket(family, socket.SOCK_STREAM)
32        sock.bind((host, port))
33    except (socket.error, socket.gaierror):
34        return False
35    else:
36        sock.close()
37        return True
38
39HOST = test_support.HOST
40MSG = b'Michael Gilfix was here\n'
41SUPPORTS_IPV6 = test_support.IPV6_ENABLED
42
43try:
44    import thread
45    import threading
46except ImportError:
47    thread = None
48    threading = None
49
50HOST = test_support.HOST
51MSG = 'Michael Gilfix was here\n'
52
53class SocketTCPTest(unittest.TestCase):
54
55    def setUp(self):
56        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57        self.port = test_support.bind_port(self.serv)
58        self.serv.listen(1)
59
60    def tearDown(self):
61        self.serv.close()
62        self.serv = None
63
64class SocketUDPTest(unittest.TestCase):
65
66    def setUp(self):
67        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
68        self.port = test_support.bind_port(self.serv)
69
70    def tearDown(self):
71        self.serv.close()
72        self.serv = None
73
74class ThreadableTest:
75    """Threadable Test class
76
77    The ThreadableTest class makes it easy to create a threaded
78    client/server pair from an existing unit test. To create a
79    new threaded class from an existing unit test, use multiple
80    inheritance:
81
82        class NewClass (OldClass, ThreadableTest):
83            pass
84
85    This class defines two new fixture functions with obvious
86    purposes for overriding:
87
88        clientSetUp ()
89        clientTearDown ()
90
91    Any new test functions within the class must then define
92    tests in pairs, where the test name is preceded with a
93    '_' to indicate the client portion of the test. Ex:
94
95        def testFoo(self):
96            # Server portion
97
98        def _testFoo(self):
99            # Client portion
100
101    Any exceptions raised by the clients during their tests
102    are caught and transferred to the main thread to alert
103    the testing framework.
104
105    Note, the server setup function cannot call any blocking
106    functions that rely on the client thread during setup,
107    unless serverExplicitReady() is called just before
108    the blocking call (such as in setting up a client/server
109    connection and performing the accept() in setUp().
110    """
111
112    def __init__(self):
113        # Swap the true setup function
114        self.__setUp = self.setUp
115        self.__tearDown = self.tearDown
116        self.setUp = self._setUp
117        self.tearDown = self._tearDown
118
119    def serverExplicitReady(self):
120        """This method allows the server to explicitly indicate that
121        it wants the client thread to proceed. This is useful if the
122        server is about to execute a blocking routine that is
123        dependent upon the client thread during its setup routine."""
124        self.server_ready.set()
125
126    def _setUp(self):
127        self.server_ready = threading.Event()
128        self.client_ready = threading.Event()
129        self.done = threading.Event()
130        self.queue = Queue.Queue(1)
131
132        # Do some munging to start the client test.
133        methodname = self.id()
134        i = methodname.rfind('.')
135        methodname = methodname[i+1:]
136        test_method = getattr(self, '_' + methodname)
137        self.client_thread = thread.start_new_thread(
138            self.clientRun, (test_method,))
139
140        self.__setUp()
141        if not self.server_ready.is_set():
142            self.server_ready.set()
143        self.client_ready.wait()
144
145    def _tearDown(self):
146        self.__tearDown()
147        self.done.wait()
148
149        if not self.queue.empty():
150            msg = self.queue.get()
151            self.fail(msg)
152
153    def clientRun(self, test_func):
154        self.server_ready.wait()
155        self.clientSetUp()
156        self.client_ready.set()
157        if not callable(test_func):
158            raise TypeError("test_func must be a callable function.")
159        try:
160            test_func()
161        except Exception, strerror:
162            self.queue.put(strerror)
163        self.clientTearDown()
164
165    def clientSetUp(self):
166        raise NotImplementedError("clientSetUp must be implemented.")
167
168    def clientTearDown(self):
169        self.done.set()
170        thread.exit()
171
172class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
173
174    def __init__(self, methodName='runTest'):
175        SocketTCPTest.__init__(self, methodName=methodName)
176        ThreadableTest.__init__(self)
177
178    def clientSetUp(self):
179        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
180
181    def clientTearDown(self):
182        self.cli.close()
183        self.cli = None
184        ThreadableTest.clientTearDown(self)
185
186class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
187
188    def __init__(self, methodName='runTest'):
189        SocketUDPTest.__init__(self, methodName=methodName)
190        ThreadableTest.__init__(self)
191
192    def clientSetUp(self):
193        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
194
195    def clientTearDown(self):
196        self.cli.close()
197        self.cli = None
198        ThreadableTest.clientTearDown(self)
199
200class SocketConnectedTest(ThreadedTCPSocketTest):
201
202    def __init__(self, methodName='runTest'):
203        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
204
205    def setUp(self):
206        ThreadedTCPSocketTest.setUp(self)
207        # Indicate explicitly we're ready for the client thread to
208        # proceed and then perform the blocking call to accept
209        self.serverExplicitReady()
210        conn, addr = self.serv.accept()
211        self.cli_conn = conn
212
213    def tearDown(self):
214        self.cli_conn.close()
215        self.cli_conn = None
216        ThreadedTCPSocketTest.tearDown(self)
217
218    def clientSetUp(self):
219        ThreadedTCPSocketTest.clientSetUp(self)
220        self.cli.connect((HOST, self.port))
221        self.serv_conn = self.cli
222
223    def clientTearDown(self):
224        self.serv_conn.close()
225        self.serv_conn = None
226        ThreadedTCPSocketTest.clientTearDown(self)
227
228class SocketPairTest(unittest.TestCase, ThreadableTest):
229
230    def __init__(self, methodName='runTest'):
231        unittest.TestCase.__init__(self, methodName=methodName)
232        ThreadableTest.__init__(self)
233
234    def setUp(self):
235        self.serv, self.cli = socket.socketpair()
236
237    def tearDown(self):
238        self.serv.close()
239        self.serv = None
240
241    def clientSetUp(self):
242        pass
243
244    def clientTearDown(self):
245        self.cli.close()
246        self.cli = None
247        ThreadableTest.clientTearDown(self)
248
249
250#######################################################################
251## Begin Tests
252
253class GeneralModuleTests(unittest.TestCase):
254
255    @unittest.skipUnless(_socket is not None, 'need _socket module')
256    def test_csocket_repr(self):
257        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
258        try:
259            expected = ('<socket object, fd=%s, family=%s, type=%s, protocol=%s>'
260                        % (s.fileno(), s.family, s.type, s.proto))
261            self.assertEqual(repr(s), expected)
262        finally:
263            s.close()
264        expected = ('<socket object, fd=-1, family=%s, type=%s, protocol=%s>'
265                    % (s.family, s.type, s.proto))
266        self.assertEqual(repr(s), expected)
267
268    def test_weakref(self):
269        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
270        p = weakref.proxy(s)
271        self.assertEqual(p.fileno(), s.fileno())
272        s.close()
273        s = None
274        try:
275            p.fileno()
276        except ReferenceError:
277            pass
278        else:
279            self.fail('Socket proxy still exists')
280
281    def test_weakref__sock(self):
282        s = socket.socket()._sock
283        w = weakref.ref(s)
284        self.assertIs(w(), s)
285        del s
286        test_support.gc_collect()
287        self.assertIsNone(w())
288
289    def testSocketError(self):
290        # Testing socket module exceptions
291        def raise_error(*args, **kwargs):
292            raise socket.error
293        def raise_herror(*args, **kwargs):
294            raise socket.herror
295        def raise_gaierror(*args, **kwargs):
296            raise socket.gaierror
297        self.assertRaises(socket.error, raise_error,
298                              "Error raising socket exception.")
299        self.assertRaises(socket.error, raise_herror,
300                              "Error raising socket exception.")
301        self.assertRaises(socket.error, raise_gaierror,
302                              "Error raising socket exception.")
303
304    def testSendtoErrors(self):
305        # Testing that sendto doesn't mask failures. See #10169.
306        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
307        self.addCleanup(s.close)
308        s.bind(('', 0))
309        sockname = s.getsockname()
310        # 2 args
311        with self.assertRaises(UnicodeEncodeError):
312            s.sendto(u'\u2620', sockname)
313        with self.assertRaises(TypeError) as cm:
314            s.sendto(5j, sockname)
315        self.assertIn('not complex', str(cm.exception))
316        with self.assertRaises(TypeError) as cm:
317            s.sendto('foo', None)
318        self.assertIn('not NoneType', str(cm.exception))
319        # 3 args
320        with self.assertRaises(UnicodeEncodeError):
321            s.sendto(u'\u2620', 0, sockname)
322        with self.assertRaises(TypeError) as cm:
323            s.sendto(5j, 0, sockname)
324        self.assertIn('not complex', str(cm.exception))
325        with self.assertRaises(TypeError) as cm:
326            s.sendto('foo', 0, None)
327        self.assertIn('not NoneType', str(cm.exception))
328        with self.assertRaises(TypeError) as cm:
329            s.sendto('foo', 'bar', sockname)
330        self.assertIn('an integer is required', str(cm.exception))
331        with self.assertRaises(TypeError) as cm:
332            s.sendto('foo', None, None)
333        self.assertIn('an integer is required', str(cm.exception))
334        # wrong number of args
335        with self.assertRaises(TypeError) as cm:
336            s.sendto('foo')
337        self.assertIn('(1 given)', str(cm.exception))
338        with self.assertRaises(TypeError) as cm:
339            s.sendto('foo', 0, sockname, 4)
340        self.assertIn('(4 given)', str(cm.exception))
341
342
343    def testCrucialConstants(self):
344        # Testing for mission critical constants
345        socket.AF_INET
346        socket.SOCK_STREAM
347        socket.SOCK_DGRAM
348        socket.SOCK_RAW
349        socket.SOCK_RDM
350        socket.SOCK_SEQPACKET
351        socket.SOL_SOCKET
352        socket.SO_REUSEADDR
353
354    def testHostnameRes(self):
355        # Testing hostname resolution mechanisms
356        hostname = socket.gethostname()
357        try:
358            ip = socket.gethostbyname(hostname)
359        except socket.error:
360            # Probably name lookup wasn't set up right; skip this test
361            self.skipTest('name lookup failure')
362        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
363        try:
364            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
365        except socket.error:
366            # Probably a similar problem as above; skip this test
367            self.skipTest('address lookup failure')
368        all_host_names = [hostname, hname] + aliases
369        fqhn = socket.getfqdn(ip)
370        if not fqhn in all_host_names:
371            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
372
373    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
374                         'test needs sys.getrefcount()')
375    def testRefCountGetNameInfo(self):
376        # Testing reference count for getnameinfo
377        try:
378            # On some versions, this loses a reference
379            orig = sys.getrefcount(__name__)
380            socket.getnameinfo(__name__,0)
381        except TypeError:
382            self.assertEqual(sys.getrefcount(__name__), orig,
383                             "socket.getnameinfo loses a reference")
384
385    def testInterpreterCrash(self):
386        # Making sure getnameinfo doesn't crash the interpreter
387        try:
388            # On some versions, this crashes the interpreter.
389            socket.getnameinfo(('x', 0, 0, 0), 0)
390        except socket.error:
391            pass
392
393    def testNtoH(self):
394        # This just checks that htons etc. are their own inverse,
395        # when looking at the lower 16 or 32 bits.
396        sizes = {socket.htonl: 32, socket.ntohl: 32,
397                 socket.htons: 16, socket.ntohs: 16}
398        for func, size in sizes.items():
399            mask = (1L<<size) - 1
400            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
401                self.assertEqual(i & mask, func(func(i&mask)) & mask)
402
403            swapped = func(mask)
404            self.assertEqual(swapped & mask, mask)
405            self.assertRaises(OverflowError, func, 1L<<34)
406
407    def testNtoHErrors(self):
408        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
409        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
410        for k in good_values:
411            socket.ntohl(k)
412            socket.ntohs(k)
413            socket.htonl(k)
414            socket.htons(k)
415        for k in bad_values:
416            self.assertRaises(OverflowError, socket.ntohl, k)
417            self.assertRaises(OverflowError, socket.ntohs, k)
418            self.assertRaises(OverflowError, socket.htonl, k)
419            self.assertRaises(OverflowError, socket.htons, k)
420
421    def testGetServBy(self):
422        eq = self.assertEqual
423        # Find one service that exists, then check all the related interfaces.
424        # I've ordered this by protocols that have both a tcp and udp
425        # protocol, at least for modern Linuxes.
426        if (sys.platform.startswith('linux') or
427            sys.platform.startswith('freebsd') or
428            sys.platform.startswith('netbsd') or
429            sys.platform == 'darwin'):
430            # avoid the 'echo' service on this platform, as there is an
431            # assumption breaking non-standard port/protocol entry
432            services = ('daytime', 'qotd', 'domain')
433        else:
434            services = ('echo', 'daytime', 'domain')
435        for service in services:
436            try:
437                port = socket.getservbyname(service, 'tcp')
438                break
439            except socket.error:
440                pass
441        else:
442            raise socket.error
443        # Try same call with optional protocol omitted
444        port2 = socket.getservbyname(service)
445        eq(port, port2)
446        # Try udp, but don't barf if it doesn't exist
447        try:
448            udpport = socket.getservbyname(service, 'udp')
449        except socket.error:
450            udpport = None
451        else:
452            eq(udpport, port)
453        # Now make sure the lookup by port returns the same service name
454        eq(socket.getservbyport(port2), service)
455        eq(socket.getservbyport(port, 'tcp'), service)
456        if udpport is not None:
457            eq(socket.getservbyport(udpport, 'udp'), service)
458        # Make sure getservbyport does not accept out of range ports.
459        self.assertRaises(OverflowError, socket.getservbyport, -1)
460        self.assertRaises(OverflowError, socket.getservbyport, 65536)
461
462    def testDefaultTimeout(self):
463        # Testing default timeout
464        # The default timeout should initially be None
465        self.assertEqual(socket.getdefaulttimeout(), None)
466        s = socket.socket()
467        self.assertEqual(s.gettimeout(), None)
468        s.close()
469
470        # Set the default timeout to 10, and see if it propagates
471        socket.setdefaulttimeout(10)
472        self.assertEqual(socket.getdefaulttimeout(), 10)
473        s = socket.socket()
474        self.assertEqual(s.gettimeout(), 10)
475        s.close()
476
477        # Reset the default timeout to None, and see if it propagates
478        socket.setdefaulttimeout(None)
479        self.assertEqual(socket.getdefaulttimeout(), None)
480        s = socket.socket()
481        self.assertEqual(s.gettimeout(), None)
482        s.close()
483
484        # Check that setting it to an invalid value raises ValueError
485        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
486
487        # Check that setting it to an invalid type raises TypeError
488        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
489
490    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
491                         'test needs socket.inet_aton()')
492    def testIPv4_inet_aton_fourbytes(self):
493        # Test that issue1008086 and issue767150 are fixed.
494        # It must return 4 bytes.
495        self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
496        self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
497
498    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
499                         'test needs socket.inet_pton()')
500    def testIPv4toString(self):
501        from socket import inet_aton as f, inet_pton, AF_INET
502        g = lambda a: inet_pton(AF_INET, a)
503
504        self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))
505        self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))
506        self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
507        self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))
508        self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))
509
510        self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))
511        self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))
512        self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
513        self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
514
515    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
516                         'test needs socket.inet_pton()')
517    def testIPv6toString(self):
518        try:
519            from socket import inet_pton, AF_INET6, has_ipv6
520            if not has_ipv6:
521                self.skipTest('IPv6 not available')
522        except ImportError:
523            self.skipTest('could not import needed symbols from socket')
524        f = lambda a: inet_pton(AF_INET6, a)
525
526        self.assertEqual('\x00' * 16, f('::'))
527        self.assertEqual('\x00' * 16, f('0::0'))
528        self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
529        self.assertEqual(
530            '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
531            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
532        )
533
534    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
535                         'test needs socket.inet_ntop()')
536    def testStringToIPv4(self):
537        from socket import inet_ntoa as f, inet_ntop, AF_INET
538        g = lambda a: inet_ntop(AF_INET, a)
539
540        self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))
541        self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))
542        self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))
543        self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))
544
545        self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))
546        self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
547        self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
548
549    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
550                         'test needs socket.inet_ntop()')
551    def testStringToIPv6(self):
552        try:
553            from socket import inet_ntop, AF_INET6, has_ipv6
554            if not has_ipv6:
555                self.skipTest('IPv6 not available')
556        except ImportError:
557            self.skipTest('could not import needed symbols from socket')
558        f = lambda a: inet_ntop(AF_INET6, a)
559
560        self.assertEqual('::', f('\x00' * 16))
561        self.assertEqual('::1', f('\x00' * 15 + '\x01'))
562        self.assertEqual(
563            'aef:b01:506:1001:ffff:9997:55:170',
564            f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
565        )
566
567    # XXX The following don't test module-level functionality...
568
569    def _get_unused_port(self, bind_address='0.0.0.0'):
570        """Use a temporary socket to elicit an unused ephemeral port.
571
572        Args:
573            bind_address: Hostname or IP address to search for a port on.
574
575        Returns: A most likely to be unused port.
576        """
577        tempsock = socket.socket()
578        tempsock.bind((bind_address, 0))
579        host, port = tempsock.getsockname()
580        tempsock.close()
581        return port
582
583    def testSockName(self):
584        # Testing getsockname()
585        port = self._get_unused_port()
586        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
587        self.addCleanup(sock.close)
588        sock.bind(("0.0.0.0", port))
589        name = sock.getsockname()
590        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
591        # it reasonable to get the host's addr in addition to 0.0.0.0.
592        # At least for eCos.  This is required for the S/390 to pass.
593        try:
594            my_ip_addr = socket.gethostbyname(socket.gethostname())
595        except socket.error:
596            # Probably name lookup wasn't set up right; skip this test
597            self.skipTest('name lookup failure')
598        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
599        self.assertEqual(name[1], port)
600
601    def testGetSockOpt(self):
602        # Testing getsockopt()
603        # We know a socket should start without reuse==0
604        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
605        self.addCleanup(sock.close)
606        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
607        self.assertFalse(reuse != 0, "initial mode is reuse")
608
609    def testSetSockOpt(self):
610        # Testing setsockopt()
611        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
612        self.addCleanup(sock.close)
613        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
614        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
615        self.assertFalse(reuse == 0, "failed to set reuse mode")
616
617    def testSendAfterClose(self):
618        # testing send() after close() with timeout
619        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
620        sock.settimeout(1)
621        sock.close()
622        self.assertRaises(socket.error, sock.send, "spam")
623
624    def testNewAttributes(self):
625        # testing .family, .type and .protocol
626        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
627        self.assertEqual(sock.family, socket.AF_INET)
628        self.assertEqual(sock.type, socket.SOCK_STREAM)
629        self.assertEqual(sock.proto, 0)
630        sock.close()
631
632    def test_getsockaddrarg(self):
633        sock = socket.socket()
634        self.addCleanup(sock.close)
635        port = test_support.find_unused_port()
636        big_port = port + 65536
637        neg_port = port - 65536
638        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
639        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
640        # Since find_unused_port() is inherently subject to race conditions, we
641        # call it a couple times if necessary.
642        for i in itertools.count():
643            port = test_support.find_unused_port()
644            try:
645                sock.bind((HOST, port))
646            except OSError as e:
647                if e.errno != errno.EADDRINUSE or i == 5:
648                    raise
649            else:
650                break
651
652    @unittest.skipUnless(os.name == "nt", "Windows specific")
653    def test_sock_ioctl(self):
654        self.assertTrue(hasattr(socket.socket, 'ioctl'))
655        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
656        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
657        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
658        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
659        s = socket.socket()
660        self.addCleanup(s.close)
661        self.assertRaises(ValueError, s.ioctl, -1, None)
662        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
663
664    def testGetaddrinfo(self):
665        try:
666            socket.getaddrinfo('localhost', 80)
667        except socket.gaierror as err:
668            if err.errno == socket.EAI_SERVICE:
669                # see http://bugs.python.org/issue1282647
670                self.skipTest("buggy libc version")
671            raise
672        # len of every sequence is supposed to be == 5
673        for info in socket.getaddrinfo(HOST, None):
674            self.assertEqual(len(info), 5)
675        # host can be a domain name, a string representation of an
676        # IPv4/v6 address or None
677        socket.getaddrinfo('localhost', 80)
678        socket.getaddrinfo('127.0.0.1', 80)
679        socket.getaddrinfo(None, 80)
680        if SUPPORTS_IPV6:
681            socket.getaddrinfo('::1', 80)
682        # port can be a string service name such as "http", a numeric
683        # port number (int or long), or None
684        socket.getaddrinfo(HOST, "http")
685        socket.getaddrinfo(HOST, 80)
686        socket.getaddrinfo(HOST, 80L)
687        socket.getaddrinfo(HOST, None)
688        # test family and socktype filters
689        infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
690        for family, _, _, _, _ in infos:
691            self.assertEqual(family, socket.AF_INET)
692        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
693        for _, socktype, _, _, _ in infos:
694            self.assertEqual(socktype, socket.SOCK_STREAM)
695        # test proto and flags arguments
696        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
697        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
698        # a server willing to support both IPv4 and IPv6 will
699        # usually do this
700        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
701                           socket.AI_PASSIVE)
702
703        # Issue 17269: test workaround for OS X platform bug segfault
704        if hasattr(socket, 'AI_NUMERICSERV'):
705            try:
706                # The arguments here are undefined and the call may succeed
707                # or fail.  All we care here is that it doesn't segfault.
708                socket.getaddrinfo("localhost", None, 0, 0, 0,
709                                   socket.AI_NUMERICSERV)
710            except socket.gaierror:
711                pass
712
713    def check_sendall_interrupted(self, with_timeout):
714        # socketpair() is not strictly required, but it makes things easier.
715        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
716            self.skipTest("signal.alarm and socket.socketpair required for this test")
717        # Our signal handlers clobber the C errno by calling a math function
718        # with an invalid domain value.
719        def ok_handler(*args):
720            self.assertRaises(ValueError, math.acosh, 0)
721        def raising_handler(*args):
722            self.assertRaises(ValueError, math.acosh, 0)
723            1 // 0
724        c, s = socket.socketpair()
725        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
726        try:
727            if with_timeout:
728                # Just above the one second minimum for signal.alarm
729                c.settimeout(1.5)
730            with self.assertRaises(ZeroDivisionError):
731                signal.alarm(1)
732                c.sendall(b"x" * test_support.SOCK_MAX_SIZE)
733            if with_timeout:
734                signal.signal(signal.SIGALRM, ok_handler)
735                signal.alarm(1)
736                self.assertRaises(socket.timeout, c.sendall,
737                                  b"x" * test_support.SOCK_MAX_SIZE)
738        finally:
739            signal.alarm(0)
740            signal.signal(signal.SIGALRM, old_alarm)
741            c.close()
742            s.close()
743
744    def test_sendall_interrupted(self):
745        self.check_sendall_interrupted(False)
746
747    def test_sendall_interrupted_with_timeout(self):
748        self.check_sendall_interrupted(True)
749
750    def test_listen_backlog(self):
751        for backlog in 0, -1:
752            srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
753            srv.bind((HOST, 0))
754            srv.listen(backlog)
755            srv.close()
756
757    @test_support.cpython_only
758    def test_listen_backlog_overflow(self):
759        # Issue 15989
760        import _testcapi
761        srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
762        srv.bind((HOST, 0))
763        self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
764        srv.close()
765
766    @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
767    def test_flowinfo(self):
768        self.assertRaises(OverflowError, socket.getnameinfo,
769                          ('::1',0, 0xffffffff), 0)
770        s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
771        try:
772            self.assertRaises(OverflowError, s.bind, ('::1', 0, -10))
773        finally:
774            s.close()
775
776
777@unittest.skipUnless(thread, 'Threading required for this test.')
778class BasicTCPTest(SocketConnectedTest):
779
780    def __init__(self, methodName='runTest'):
781        SocketConnectedTest.__init__(self, methodName=methodName)
782
783    def testRecv(self):
784        # Testing large receive over TCP
785        msg = self.cli_conn.recv(1024)
786        self.assertEqual(msg, MSG)
787
788    def _testRecv(self):
789        self.serv_conn.send(MSG)
790
791    def testOverFlowRecv(self):
792        # Testing receive in chunks over TCP
793        seg1 = self.cli_conn.recv(len(MSG) - 3)
794        seg2 = self.cli_conn.recv(1024)
795        msg = seg1 + seg2
796        self.assertEqual(msg, MSG)
797
798    def _testOverFlowRecv(self):
799        self.serv_conn.send(MSG)
800
801    def testRecvFrom(self):
802        # Testing large recvfrom() over TCP
803        msg, addr = self.cli_conn.recvfrom(1024)
804        self.assertEqual(msg, MSG)
805
806    def _testRecvFrom(self):
807        self.serv_conn.send(MSG)
808
809    def testOverFlowRecvFrom(self):
810        # Testing recvfrom() in chunks over TCP
811        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
812        seg2, addr = self.cli_conn.recvfrom(1024)
813        msg = seg1 + seg2
814        self.assertEqual(msg, MSG)
815
816    def _testOverFlowRecvFrom(self):
817        self.serv_conn.send(MSG)
818
819    def testSendAll(self):
820        # Testing sendall() with a 2048 byte string over TCP
821        msg = ''
822        while 1:
823            read = self.cli_conn.recv(1024)
824            if not read:
825                break
826            msg += read
827        self.assertEqual(msg, 'f' * 2048)
828
829    def _testSendAll(self):
830        big_chunk = 'f' * 2048
831        self.serv_conn.sendall(big_chunk)
832
833    @unittest.skipUnless(hasattr(socket, 'fromfd'),
834                         'socket.fromfd not available')
835    def testFromFd(self):
836        # Testing fromfd()
837        fd = self.cli_conn.fileno()
838        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
839        self.addCleanup(sock.close)
840        msg = sock.recv(1024)
841        self.assertEqual(msg, MSG)
842
843    def _testFromFd(self):
844        self.serv_conn.send(MSG)
845
846    def testDup(self):
847        # Testing dup()
848        sock = self.cli_conn.dup()
849        self.addCleanup(sock.close)
850        msg = sock.recv(1024)
851        self.assertEqual(msg, MSG)
852
853    def _testDup(self):
854        self.serv_conn.send(MSG)
855
856    def testShutdown(self):
857        # Testing shutdown()
858        msg = self.cli_conn.recv(1024)
859        self.assertEqual(msg, MSG)
860        # wait for _testShutdown to finish: on OS X, when the server
861        # closes the connection the client also becomes disconnected,
862        # and the client's shutdown call will fail. (Issue #4397.)
863        self.done.wait()
864
865    def _testShutdown(self):
866        self.serv_conn.send(MSG)
867        self.serv_conn.shutdown(2)
868
869    testShutdown_overflow = test_support.cpython_only(testShutdown)
870
871    @test_support.cpython_only
872    def _testShutdown_overflow(self):
873        import _testcapi
874        self.serv_conn.send(MSG)
875        # Issue 15989
876        self.assertRaises(OverflowError, self.serv_conn.shutdown,
877                          _testcapi.INT_MAX + 1)
878        self.assertRaises(OverflowError, self.serv_conn.shutdown,
879                          2 + (_testcapi.UINT_MAX + 1))
880        self.serv_conn.shutdown(2)
881
882@unittest.skipUnless(thread, 'Threading required for this test.')
883class BasicUDPTest(ThreadedUDPSocketTest):
884
885    def __init__(self, methodName='runTest'):
886        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
887
888    def testSendtoAndRecv(self):
889        # Testing sendto() and Recv() over UDP
890        msg = self.serv.recv(len(MSG))
891        self.assertEqual(msg, MSG)
892
893    def _testSendtoAndRecv(self):
894        self.cli.sendto(MSG, 0, (HOST, self.port))
895
896    def testRecvFrom(self):
897        # Testing recvfrom() over UDP
898        msg, addr = self.serv.recvfrom(len(MSG))
899        self.assertEqual(msg, MSG)
900
901    def _testRecvFrom(self):
902        self.cli.sendto(MSG, 0, (HOST, self.port))
903
904    def testRecvFromNegative(self):
905        # Negative lengths passed to recvfrom should give ValueError.
906        self.assertRaises(ValueError, self.serv.recvfrom, -1)
907
908    def _testRecvFromNegative(self):
909        self.cli.sendto(MSG, 0, (HOST, self.port))
910
911@unittest.skipUnless(thread, 'Threading required for this test.')
912class TCPCloserTest(ThreadedTCPSocketTest):
913
914    def testClose(self):
915        conn, addr = self.serv.accept()
916        conn.close()
917
918        sd = self.cli
919        read, write, err = select.select([sd], [], [], 1.0)
920        self.assertEqual(read, [sd])
921        self.assertEqual(sd.recv(1), '')
922
923    def _testClose(self):
924        self.cli.connect((HOST, self.port))
925        time.sleep(1.0)
926
927@unittest.skipUnless(hasattr(socket, 'socketpair'),
928                     'test needs socket.socketpair()')
929@unittest.skipUnless(thread, 'Threading required for this test.')
930class BasicSocketPairTest(SocketPairTest):
931
932    def __init__(self, methodName='runTest'):
933        SocketPairTest.__init__(self, methodName=methodName)
934
935    def testRecv(self):
936        msg = self.serv.recv(1024)
937        self.assertEqual(msg, MSG)
938
939    def _testRecv(self):
940        self.cli.send(MSG)
941
942    def testSend(self):
943        self.serv.send(MSG)
944
945    def _testSend(self):
946        msg = self.cli.recv(1024)
947        self.assertEqual(msg, MSG)
948
949@unittest.skipUnless(thread, 'Threading required for this test.')
950class NonBlockingTCPTests(ThreadedTCPSocketTest):
951
952    def __init__(self, methodName='runTest'):
953        self.event = threading.Event()
954        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
955
956    def testSetBlocking(self):
957        # Testing whether set blocking works
958        self.serv.setblocking(True)
959        self.assertIsNone(self.serv.gettimeout())
960        self.serv.setblocking(False)
961        self.assertEqual(self.serv.gettimeout(), 0.0)
962        start = time.time()
963        try:
964            self.serv.accept()
965        except socket.error:
966            pass
967        end = time.time()
968        self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
969
970    def _testSetBlocking(self):
971        pass
972
973    @test_support.cpython_only
974    def testSetBlocking_overflow(self):
975        # Issue 15989
976        import _testcapi
977        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
978            self.skipTest('needs UINT_MAX < ULONG_MAX')
979        self.serv.setblocking(False)
980        self.assertEqual(self.serv.gettimeout(), 0.0)
981        self.serv.setblocking(_testcapi.UINT_MAX + 1)
982        self.assertIsNone(self.serv.gettimeout())
983
984    _testSetBlocking_overflow = test_support.cpython_only(_testSetBlocking)
985
986    def testAccept(self):
987        # Testing non-blocking accept
988        self.serv.setblocking(0)
989
990        # connect() didn't start: non-blocking accept() fails
991        with self.assertRaises(socket.error):
992            conn, addr = self.serv.accept()
993
994        self.event.set()
995
996        read, write, err = select.select([self.serv], [], [], MAIN_TIMEOUT)
997        if self.serv not in read:
998            self.fail("Error trying to do accept after select.")
999
1000        # connect() completed: non-blocking accept() doesn't block
1001        conn, addr = self.serv.accept()
1002        self.addCleanup(conn.close)
1003        self.assertIsNone(conn.gettimeout())
1004
1005    def _testAccept(self):
1006        # don't connect before event is set to check
1007        # that non-blocking accept() raises socket.error
1008        self.event.wait()
1009
1010        self.cli.connect((HOST, self.port))
1011
1012    def testConnect(self):
1013        # Testing non-blocking connect
1014        conn, addr = self.serv.accept()
1015        conn.close()
1016
1017    def _testConnect(self):
1018        self.cli.settimeout(10)
1019        self.cli.connect((HOST, self.port))
1020
1021    def testRecv(self):
1022        # Testing non-blocking recv
1023        conn, addr = self.serv.accept()
1024        self.addCleanup(conn.close)
1025        conn.setblocking(0)
1026
1027        # the server didn't send data yet: non-blocking recv() fails
1028        with self.assertRaises(socket.error):
1029            msg = conn.recv(len(MSG))
1030
1031        self.event.set()
1032
1033        read, write, err = select.select([conn], [], [], MAIN_TIMEOUT)
1034        if conn not in read:
1035            self.fail("Error during select call to non-blocking socket.")
1036
1037        # the server sent data yet: non-blocking recv() doesn't block
1038        msg = conn.recv(len(MSG))
1039        self.assertEqual(msg, MSG)
1040
1041    def _testRecv(self):
1042        self.cli.connect((HOST, self.port))
1043
1044        # don't send anything before event is set to check
1045        # that non-blocking recv() raises socket.error
1046        self.event.wait()
1047
1048        # send data: recv() will no longer block
1049        self.cli.sendall(MSG)
1050
1051@unittest.skipUnless(thread, 'Threading required for this test.')
1052class FileObjectClassTestCase(SocketConnectedTest):
1053
1054    bufsize = -1 # Use default buffer size
1055
1056    def __init__(self, methodName='runTest'):
1057        SocketConnectedTest.__init__(self, methodName=methodName)
1058
1059    def setUp(self):
1060        SocketConnectedTest.setUp(self)
1061        self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
1062
1063    def tearDown(self):
1064        self.serv_file.close()
1065        self.assertTrue(self.serv_file.closed)
1066        SocketConnectedTest.tearDown(self)
1067        self.serv_file = None
1068
1069    def clientSetUp(self):
1070        SocketConnectedTest.clientSetUp(self)
1071        self.cli_file = self.serv_conn.makefile('wb')
1072
1073    def clientTearDown(self):
1074        self.cli_file.close()
1075        self.assertTrue(self.cli_file.closed)
1076        self.cli_file = None
1077        SocketConnectedTest.clientTearDown(self)
1078
1079    def testSmallRead(self):
1080        # Performing small file read test
1081        first_seg = self.serv_file.read(len(MSG)-3)
1082        second_seg = self.serv_file.read(3)
1083        msg = first_seg + second_seg
1084        self.assertEqual(msg, MSG)
1085
1086    def _testSmallRead(self):
1087        self.cli_file.write(MSG)
1088        self.cli_file.flush()
1089
1090    def testFullRead(self):
1091        # read until EOF
1092        msg = self.serv_file.read()
1093        self.assertEqual(msg, MSG)
1094
1095    def _testFullRead(self):
1096        self.cli_file.write(MSG)
1097        self.cli_file.close()
1098
1099    def testUnbufferedRead(self):
1100        # Performing unbuffered file read test
1101        buf = ''
1102        while 1:
1103            char = self.serv_file.read(1)
1104            if not char:
1105                break
1106            buf += char
1107        self.assertEqual(buf, MSG)
1108
1109    def _testUnbufferedRead(self):
1110        self.cli_file.write(MSG)
1111        self.cli_file.flush()
1112
1113    def testReadline(self):
1114        # Performing file readline test
1115        line = self.serv_file.readline()
1116        self.assertEqual(line, MSG)
1117
1118    def _testReadline(self):
1119        self.cli_file.write(MSG)
1120        self.cli_file.flush()
1121
1122    def testReadlineAfterRead(self):
1123        a_baloo_is = self.serv_file.read(len("A baloo is"))
1124        self.assertEqual("A baloo is", a_baloo_is)
1125        _a_bear = self.serv_file.read(len(" a bear"))
1126        self.assertEqual(" a bear", _a_bear)
1127        line = self.serv_file.readline()
1128        self.assertEqual("\n", line)
1129        line = self.serv_file.readline()
1130        self.assertEqual("A BALOO IS A BEAR.\n", line)
1131        line = self.serv_file.readline()
1132        self.assertEqual(MSG, line)
1133
1134    def _testReadlineAfterRead(self):
1135        self.cli_file.write("A baloo is a bear\n")
1136        self.cli_file.write("A BALOO IS A BEAR.\n")
1137        self.cli_file.write(MSG)
1138        self.cli_file.flush()
1139
1140    def testReadlineAfterReadNoNewline(self):
1141        end_of_ = self.serv_file.read(len("End Of "))
1142        self.assertEqual("End Of ", end_of_)
1143        line = self.serv_file.readline()
1144        self.assertEqual("Line", line)
1145
1146    def _testReadlineAfterReadNoNewline(self):
1147        self.cli_file.write("End Of Line")
1148
1149    def testClosedAttr(self):
1150        self.assertTrue(not self.serv_file.closed)
1151
1152    def _testClosedAttr(self):
1153        self.assertTrue(not self.cli_file.closed)
1154
1155
1156class FileObjectInterruptedTestCase(unittest.TestCase):
1157    """Test that the file object correctly handles EINTR internally."""
1158
1159    class MockSocket(object):
1160        def __init__(self, recv_funcs=()):
1161            # A generator that returns callables that we'll call for each
1162            # call to recv().
1163            self._recv_step = iter(recv_funcs)
1164
1165        def recv(self, size):
1166            return self._recv_step.next()()
1167
1168    @staticmethod
1169    def _raise_eintr():
1170        raise socket.error(errno.EINTR)
1171
1172    def _test_readline(self, size=-1, **kwargs):
1173        mock_sock = self.MockSocket(recv_funcs=[
1174                lambda : "This is the first line\nAnd the sec",
1175                self._raise_eintr,
1176                lambda : "ond line is here\n",
1177                lambda : "",
1178            ])
1179        fo = socket._fileobject(mock_sock, **kwargs)
1180        self.assertEqual(fo.readline(size), "This is the first line\n")
1181        self.assertEqual(fo.readline(size), "And the second line is here\n")
1182
1183    def _test_read(self, size=-1, **kwargs):
1184        mock_sock = self.MockSocket(recv_funcs=[
1185                lambda : "This is the first line\nAnd the sec",
1186                self._raise_eintr,
1187                lambda : "ond line is here\n",
1188                lambda : "",
1189            ])
1190        fo = socket._fileobject(mock_sock, **kwargs)
1191        self.assertEqual(fo.read(size), "This is the first line\n"
1192                          "And the second line is here\n")
1193
1194    def test_default(self):
1195        self._test_readline()
1196        self._test_readline(size=100)
1197        self._test_read()
1198        self._test_read(size=100)
1199
1200    def test_with_1k_buffer(self):
1201        self._test_readline(bufsize=1024)
1202        self._test_readline(size=100, bufsize=1024)
1203        self._test_read(bufsize=1024)
1204        self._test_read(size=100, bufsize=1024)
1205
1206    def _test_readline_no_buffer(self, size=-1):
1207        mock_sock = self.MockSocket(recv_funcs=[
1208                lambda : "aa",
1209                lambda : "\n",
1210                lambda : "BB",
1211                self._raise_eintr,
1212                lambda : "bb",
1213                lambda : "",
1214            ])
1215        fo = socket._fileobject(mock_sock, bufsize=0)
1216        self.assertEqual(fo.readline(size), "aa\n")
1217        self.assertEqual(fo.readline(size), "BBbb")
1218
1219    def test_no_buffer(self):
1220        self._test_readline_no_buffer()
1221        self._test_readline_no_buffer(size=4)
1222        self._test_read(bufsize=0)
1223        self._test_read(size=100, bufsize=0)
1224
1225
1226class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
1227
1228    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
1229
1230    In this case (and in this case only), it should be possible to
1231    create a file object, read a line from it, create another file
1232    object, read another line from it, without loss of data in the
1233    first file object's buffer.  Note that httplib relies on this
1234    when reading multiple requests from the same socket."""
1235
1236    bufsize = 0 # Use unbuffered mode
1237
1238    def testUnbufferedReadline(self):
1239        # Read a line, create a new file object, read another line with it
1240        line = self.serv_file.readline() # first line
1241        self.assertEqual(line, "A. " + MSG) # first line
1242        self.serv_file = self.cli_conn.makefile('rb', 0)
1243        line = self.serv_file.readline() # second line
1244        self.assertEqual(line, "B. " + MSG) # second line
1245
1246    def _testUnbufferedReadline(self):
1247        self.cli_file.write("A. " + MSG)
1248        self.cli_file.write("B. " + MSG)
1249        self.cli_file.flush()
1250
1251class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1252
1253    bufsize = 1 # Default-buffered for reading; line-buffered for writing
1254
1255    class SocketMemo(object):
1256        """A wrapper to keep track of sent data, needed to examine write behaviour"""
1257        def __init__(self, sock):
1258            self._sock = sock
1259            self.sent = []
1260
1261        def send(self, data, flags=0):
1262            n = self._sock.send(data, flags)
1263            self.sent.append(data[:n])
1264            return n
1265
1266        def sendall(self, data, flags=0):
1267            self._sock.sendall(data, flags)
1268            self.sent.append(data)
1269
1270        def __getattr__(self, attr):
1271            return getattr(self._sock, attr)
1272
1273        def getsent(self):
1274            return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent]
1275
1276    def setUp(self):
1277        FileObjectClassTestCase.setUp(self)
1278        self.serv_file._sock = self.SocketMemo(self.serv_file._sock)
1279
1280    def testLinebufferedWrite(self):
1281        # Write two lines, in small chunks
1282        msg = MSG.strip()
1283        print >> self.serv_file, msg,
1284        print >> self.serv_file, msg
1285
1286        # second line:
1287        print >> self.serv_file, msg,
1288        print >> self.serv_file, msg,
1289        print >> self.serv_file, msg
1290
1291        # third line
1292        print >> self.serv_file, ''
1293
1294        self.serv_file.flush()
1295
1296        msg1 = "%s %s\n"%(msg, msg)
1297        msg2 =  "%s %s %s\n"%(msg, msg, msg)
1298        msg3 =  "\n"
1299        self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3])
1300
1301    def _testLinebufferedWrite(self):
1302        msg = MSG.strip()
1303        msg1 = "%s %s\n"%(msg, msg)
1304        msg2 =  "%s %s %s\n"%(msg, msg, msg)
1305        msg3 =  "\n"
1306        l1 = self.cli_file.readline()
1307        self.assertEqual(l1, msg1)
1308        l2 = self.cli_file.readline()
1309        self.assertEqual(l2, msg2)
1310        l3 = self.cli_file.readline()
1311        self.assertEqual(l3, msg3)
1312
1313
1314class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1315
1316    bufsize = 2 # Exercise the buffering code
1317
1318
1319class NetworkConnectionTest(object):
1320    """Prove network connection."""
1321    def clientSetUp(self):
1322        # We're inherited below by BasicTCPTest2, which also inherits
1323        # BasicTCPTest, which defines self.port referenced below.
1324        self.cli = socket.create_connection((HOST, self.port))
1325        self.serv_conn = self.cli
1326
1327class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
1328    """Tests that NetworkConnection does not break existing TCP functionality.
1329    """
1330
1331class NetworkConnectionNoServer(unittest.TestCase):
1332    class MockSocket(socket.socket):
1333        def connect(self, *args):
1334            raise socket.timeout('timed out')
1335
1336    @contextlib.contextmanager
1337    def mocked_socket_module(self):
1338        """Return a socket which times out on connect"""
1339        old_socket = socket.socket
1340        socket.socket = self.MockSocket
1341        try:
1342            yield
1343        finally:
1344            socket.socket = old_socket
1345
1346    def test_connect(self):
1347        port = test_support.find_unused_port()
1348        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1349        self.addCleanup(cli.close)
1350        with self.assertRaises(socket.error) as cm:
1351            cli.connect((HOST, port))
1352        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
1353
1354    def test_create_connection(self):
1355        # Issue #9792: errors raised by create_connection() should have
1356        # a proper errno attribute.
1357        port = test_support.find_unused_port()
1358        with self.assertRaises(socket.error) as cm:
1359            socket.create_connection((HOST, port))
1360
1361        # Issue #16257: create_connection() calls getaddrinfo() against
1362        # 'localhost'.  This may result in an IPV6 addr being returned
1363        # as well as an IPV4 one:
1364        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
1365        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
1366        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
1367        #
1368        # create_connection() enumerates through all the addresses returned
1369        # and if it doesn't successfully bind to any of them, it propagates
1370        # the last exception it encountered.
1371        #
1372        # On Solaris, ENETUNREACH is returned in this circumstance instead
1373        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
1374        # expected errnos.
1375        expected_errnos = [ errno.ECONNREFUSED, ]
1376        if hasattr(errno, 'ENETUNREACH'):
1377            expected_errnos.append(errno.ENETUNREACH)
1378        if hasattr(errno, 'EADDRNOTAVAIL'):
1379            # bpo-31910: socket.create_connection() fails randomly
1380            # with EADDRNOTAVAIL on Travis CI
1381            expected_errnos.append(errno.EADDRNOTAVAIL)
1382
1383        self.assertIn(cm.exception.errno, expected_errnos)
1384
1385    def test_create_connection_timeout(self):
1386        # Issue #9792: create_connection() should not recast timeout errors
1387        # as generic socket errors.
1388        with self.mocked_socket_module():
1389            with self.assertRaises(socket.timeout):
1390                socket.create_connection((HOST, 1234))
1391
1392
1393@unittest.skipUnless(thread, 'Threading required for this test.')
1394class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
1395
1396    def __init__(self, methodName='runTest'):
1397        SocketTCPTest.__init__(self, methodName=methodName)
1398        ThreadableTest.__init__(self)
1399
1400    def clientSetUp(self):
1401        self.source_port = test_support.find_unused_port()
1402
1403    def clientTearDown(self):
1404        self.cli.close()
1405        self.cli = None
1406        ThreadableTest.clientTearDown(self)
1407
1408    def _justAccept(self):
1409        conn, addr = self.serv.accept()
1410        conn.close()
1411
1412    testFamily = _justAccept
1413    def _testFamily(self):
1414        self.cli = socket.create_connection((HOST, self.port), timeout=30)
1415        self.addCleanup(self.cli.close)
1416        self.assertEqual(self.cli.family, 2)
1417
1418    testSourceAddress = _justAccept
1419    def _testSourceAddress(self):
1420        self.cli = socket.create_connection((HOST, self.port), timeout=30,
1421                source_address=('', self.source_port))
1422        self.addCleanup(self.cli.close)
1423        self.assertEqual(self.cli.getsockname()[1], self.source_port)
1424        # The port number being used is sufficient to show that the bind()
1425        # call happened.
1426
1427    testTimeoutDefault = _justAccept
1428    def _testTimeoutDefault(self):
1429        # passing no explicit timeout uses socket's global default
1430        self.assertTrue(socket.getdefaulttimeout() is None)
1431        socket.setdefaulttimeout(42)
1432        try:
1433            self.cli = socket.create_connection((HOST, self.port))
1434            self.addCleanup(self.cli.close)
1435        finally:
1436            socket.setdefaulttimeout(None)
1437        self.assertEqual(self.cli.gettimeout(), 42)
1438
1439    testTimeoutNone = _justAccept
1440    def _testTimeoutNone(self):
1441        # None timeout means the same as sock.settimeout(None)
1442        self.assertTrue(socket.getdefaulttimeout() is None)
1443        socket.setdefaulttimeout(30)
1444        try:
1445            self.cli = socket.create_connection((HOST, self.port), timeout=None)
1446            self.addCleanup(self.cli.close)
1447        finally:
1448            socket.setdefaulttimeout(None)
1449        self.assertEqual(self.cli.gettimeout(), None)
1450
1451    testTimeoutValueNamed = _justAccept
1452    def _testTimeoutValueNamed(self):
1453        self.cli = socket.create_connection((HOST, self.port), timeout=30)
1454        self.assertEqual(self.cli.gettimeout(), 30)
1455
1456    testTimeoutValueNonamed = _justAccept
1457    def _testTimeoutValueNonamed(self):
1458        self.cli = socket.create_connection((HOST, self.port), 30)
1459        self.addCleanup(self.cli.close)
1460        self.assertEqual(self.cli.gettimeout(), 30)
1461
1462@unittest.skipUnless(thread, 'Threading required for this test.')
1463class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
1464
1465    def __init__(self, methodName='runTest'):
1466        SocketTCPTest.__init__(self, methodName=methodName)
1467        ThreadableTest.__init__(self)
1468
1469    def clientSetUp(self):
1470        pass
1471
1472    def clientTearDown(self):
1473        self.cli.close()
1474        self.cli = None
1475        ThreadableTest.clientTearDown(self)
1476
1477    def testInsideTimeout(self):
1478        conn, addr = self.serv.accept()
1479        self.addCleanup(conn.close)
1480        time.sleep(3)
1481        conn.send("done!")
1482    testOutsideTimeout = testInsideTimeout
1483
1484    def _testInsideTimeout(self):
1485        self.cli = sock = socket.create_connection((HOST, self.port))
1486        data = sock.recv(5)
1487        self.assertEqual(data, "done!")
1488
1489    def _testOutsideTimeout(self):
1490        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1491        self.assertRaises(socket.timeout, lambda: sock.recv(5))
1492
1493
1494class Urllib2FileobjectTest(unittest.TestCase):
1495
1496    # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1497    # it close the socket if the close c'tor argument is true
1498
1499    def testClose(self):
1500        class MockSocket:
1501            closed = False
1502            def flush(self): pass
1503            def close(self): self.closed = True
1504
1505        # must not close unless we request it: the original use of _fileobject
1506        # by module socket requires that the underlying socket not be closed until
1507        # the _socketobject that created the _fileobject is closed
1508        s = MockSocket()
1509        f = socket._fileobject(s)
1510        f.close()
1511        self.assertTrue(not s.closed)
1512
1513        s = MockSocket()
1514        f = socket._fileobject(s, close=True)
1515        f.close()
1516        self.assertTrue(s.closed)
1517
1518class TCPTimeoutTest(SocketTCPTest):
1519
1520    def testTCPTimeout(self):
1521        def raise_timeout(*args, **kwargs):
1522            self.serv.settimeout(1.0)
1523            self.serv.accept()
1524        self.assertRaises(socket.timeout, raise_timeout,
1525                              "Error generating a timeout exception (TCP)")
1526
1527    def testTimeoutZero(self):
1528        ok = False
1529        try:
1530            self.serv.settimeout(0.0)
1531            foo = self.serv.accept()
1532        except socket.timeout:
1533            self.fail("caught timeout instead of error (TCP)")
1534        except socket.error:
1535            ok = True
1536        except:
1537            self.fail("caught unexpected exception (TCP)")
1538        if not ok:
1539            self.fail("accept() returned success when we did not expect it")
1540
1541    @unittest.skipUnless(hasattr(signal, 'alarm'),
1542                         'test needs signal.alarm()')
1543    def testInterruptedTimeout(self):
1544        # XXX I don't know how to do this test on MSWindows or any other
1545        # plaform that doesn't support signal.alarm() or os.kill(), though
1546        # the bug should have existed on all platforms.
1547        self.serv.settimeout(5.0)   # must be longer than alarm
1548        class Alarm(Exception):
1549            pass
1550        def alarm_handler(signal, frame):
1551            raise Alarm
1552        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1553        try:
1554            try:
1555                signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
1556                foo = self.serv.accept()
1557            except socket.timeout:
1558                self.fail("caught timeout instead of Alarm")
1559            except Alarm:
1560                pass
1561            except:
1562                self.fail("caught other exception instead of Alarm:"
1563                          " %s(%s):\n%s" %
1564                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
1565            else:
1566                self.fail("nothing caught")
1567            finally:
1568                signal.alarm(0)         # shut off alarm
1569        except Alarm:
1570            self.fail("got Alarm in wrong place")
1571        finally:
1572            # no alarm can be pending.  Safe to restore old handler.
1573            signal.signal(signal.SIGALRM, old_alarm)
1574
1575class UDPTimeoutTest(SocketUDPTest):
1576
1577    def testUDPTimeout(self):
1578        def raise_timeout(*args, **kwargs):
1579            self.serv.settimeout(1.0)
1580            self.serv.recv(1024)
1581        self.assertRaises(socket.timeout, raise_timeout,
1582                              "Error generating a timeout exception (UDP)")
1583
1584    def testTimeoutZero(self):
1585        ok = False
1586        try:
1587            self.serv.settimeout(0.0)
1588            foo = self.serv.recv(1024)
1589        except socket.timeout:
1590            self.fail("caught timeout instead of error (UDP)")
1591        except socket.error:
1592            ok = True
1593        except:
1594            self.fail("caught unexpected exception (UDP)")
1595        if not ok:
1596            self.fail("recv() returned success when we did not expect it")
1597
1598class TestExceptions(unittest.TestCase):
1599
1600    def testExceptionTree(self):
1601        self.assertTrue(issubclass(socket.error, Exception))
1602        self.assertTrue(issubclass(socket.herror, socket.error))
1603        self.assertTrue(issubclass(socket.gaierror, socket.error))
1604        self.assertTrue(issubclass(socket.timeout, socket.error))
1605
1606@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
1607class TestLinuxAbstractNamespace(unittest.TestCase):
1608
1609    UNIX_PATH_MAX = 108
1610
1611    def testLinuxAbstractNamespace(self):
1612        address = "\x00python-test-hello\x00\xff"
1613        s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1614        s1.bind(address)
1615        s1.listen(1)
1616        s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1617        s2.connect(s1.getsockname())
1618        s1.accept()
1619        self.assertEqual(s1.getsockname(), address)
1620        self.assertEqual(s2.getpeername(), address)
1621
1622    def testMaxName(self):
1623        address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1624        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1625        s.bind(address)
1626        self.assertEqual(s.getsockname(), address)
1627
1628    def testNameOverflow(self):
1629        address = "\x00" + "h" * self.UNIX_PATH_MAX
1630        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1631        self.assertRaises(socket.error, s.bind, address)
1632
1633
1634@unittest.skipUnless(thread, 'Threading required for this test.')
1635class BufferIOTest(SocketConnectedTest):
1636    """
1637    Test the buffer versions of socket.recv() and socket.send().
1638    """
1639    def __init__(self, methodName='runTest'):
1640        SocketConnectedTest.__init__(self, methodName=methodName)
1641
1642    def testRecvIntoArray(self):
1643        buf = array.array('c', ' '*1024)
1644        nbytes = self.cli_conn.recv_into(buf)
1645        self.assertEqual(nbytes, len(MSG))
1646        msg = buf.tostring()[:len(MSG)]
1647        self.assertEqual(msg, MSG)
1648
1649    def _testRecvIntoArray(self):
1650        with test_support.check_py3k_warnings():
1651            buf = buffer(MSG)
1652        self.serv_conn.send(buf)
1653
1654    def testRecvIntoBytearray(self):
1655        buf = bytearray(1024)
1656        nbytes = self.cli_conn.recv_into(buf)
1657        self.assertEqual(nbytes, len(MSG))
1658        msg = buf[:len(MSG)]
1659        self.assertEqual(msg, MSG)
1660
1661    _testRecvIntoBytearray = _testRecvIntoArray
1662
1663    def testRecvIntoMemoryview(self):
1664        buf = bytearray(1024)
1665        nbytes = self.cli_conn.recv_into(memoryview(buf))
1666        self.assertEqual(nbytes, len(MSG))
1667        msg = buf[:len(MSG)]
1668        self.assertEqual(msg, MSG)
1669
1670    _testRecvIntoMemoryview = _testRecvIntoArray
1671
1672    def testRecvFromIntoArray(self):
1673        buf = array.array('c', ' '*1024)
1674        nbytes, addr = self.cli_conn.recvfrom_into(buf)
1675        self.assertEqual(nbytes, len(MSG))
1676        msg = buf.tostring()[:len(MSG)]
1677        self.assertEqual(msg, MSG)
1678
1679    def _testRecvFromIntoArray(self):
1680        with test_support.check_py3k_warnings():
1681            buf = buffer(MSG)
1682        self.serv_conn.send(buf)
1683
1684    def testRecvFromIntoBytearray(self):
1685        buf = bytearray(1024)
1686        nbytes, addr = self.cli_conn.recvfrom_into(buf)
1687        self.assertEqual(nbytes, len(MSG))
1688        msg = buf[:len(MSG)]
1689        self.assertEqual(msg, MSG)
1690
1691    _testRecvFromIntoBytearray = _testRecvFromIntoArray
1692
1693    def testRecvFromIntoMemoryview(self):
1694        buf = bytearray(1024)
1695        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
1696        self.assertEqual(nbytes, len(MSG))
1697        msg = buf[:len(MSG)]
1698        self.assertEqual(msg, MSG)
1699
1700    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
1701
1702    def testRecvFromIntoSmallBuffer(self):
1703        # See issue #20246.
1704        buf = bytearray(8)
1705        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
1706
1707    def _testRecvFromIntoSmallBuffer(self):
1708        with test_support.check_py3k_warnings():
1709            buf = buffer(MSG)
1710        self.serv_conn.send(buf)
1711
1712    def testRecvFromIntoEmptyBuffer(self):
1713        buf = bytearray()
1714        self.cli_conn.recvfrom_into(buf)
1715        self.cli_conn.recvfrom_into(buf, 0)
1716
1717    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
1718
1719
1720TIPC_STYPE = 2000
1721TIPC_LOWER = 200
1722TIPC_UPPER = 210
1723
1724def isTipcAvailable():
1725    """Check if the TIPC module is loaded
1726
1727    The TIPC module is not loaded automatically on Ubuntu and probably
1728    other Linux distros.
1729    """
1730    if not hasattr(socket, "AF_TIPC"):
1731        return False
1732    try:
1733        f = open("/proc/modules")
1734    except IOError as e:
1735        # It's ok if the file does not exist, is a directory or if we
1736        # have not the permission to read it. In any other case it's a
1737        # real error, so raise it again.
1738        if e.errno in (errno.ENOENT, errno.EISDIR, errno.EACCES):
1739            return False
1740        else:
1741            raise
1742    with f:
1743        for line in f:
1744            if line.startswith("tipc "):
1745                return True
1746    return False
1747
1748@unittest.skipUnless(isTipcAvailable(),
1749                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
1750class TIPCTest(unittest.TestCase):
1751    def testRDM(self):
1752        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1753        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1754
1755        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1756        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1757                TIPC_LOWER, TIPC_UPPER)
1758        srv.bind(srvaddr)
1759
1760        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1761                TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1762        cli.sendto(MSG, sendaddr)
1763
1764        msg, recvaddr = srv.recvfrom(1024)
1765
1766        self.assertEqual(cli.getsockname(), recvaddr)
1767        self.assertEqual(msg, MSG)
1768
1769
1770@unittest.skipUnless(isTipcAvailable(),
1771                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
1772class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
1773    def __init__(self, methodName = 'runTest'):
1774        unittest.TestCase.__init__(self, methodName = methodName)
1775        ThreadableTest.__init__(self)
1776
1777    def setUp(self):
1778        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1779        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1780        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1781                TIPC_LOWER, TIPC_UPPER)
1782        self.srv.bind(srvaddr)
1783        self.srv.listen(5)
1784        self.serverExplicitReady()
1785        self.conn, self.connaddr = self.srv.accept()
1786
1787    def clientSetUp(self):
1788        # There is a hittable race between serverExplicitReady() and the
1789        # accept() call; sleep a little while to avoid it, otherwise
1790        # we could get an exception
1791        time.sleep(0.1)
1792        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1793        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1794                TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1795        self.cli.connect(addr)
1796        self.cliaddr = self.cli.getsockname()
1797
1798    def testStream(self):
1799        msg = self.conn.recv(1024)
1800        self.assertEqual(msg, MSG)
1801        self.assertEqual(self.cliaddr, self.connaddr)
1802
1803    def _testStream(self):
1804        self.cli.send(MSG)
1805        self.cli.close()
1806
1807
1808def test_main():
1809    tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1810             TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
1811             UDPTimeoutTest ]
1812
1813    tests.extend([
1814        NonBlockingTCPTests,
1815        FileObjectClassTestCase,
1816        FileObjectInterruptedTestCase,
1817        UnbufferedFileObjectClassTestCase,
1818        LineBufferedFileObjectClassTestCase,
1819        SmallBufferedFileObjectClassTestCase,
1820        Urllib2FileobjectTest,
1821        NetworkConnectionNoServer,
1822        NetworkConnectionAttributesTest,
1823        NetworkConnectionBehaviourTest,
1824    ])
1825    tests.append(BasicSocketPairTest)
1826    tests.append(TestLinuxAbstractNamespace)
1827    tests.extend([TIPCTest, TIPCThreadableTest])
1828
1829    thread_info = test_support.threading_setup()
1830    test_support.run_unittest(*tests)
1831    test_support.threading_cleanup(*thread_info)
1832
1833if __name__ == "__main__":
1834    test_main()
1835