1import unittest
2from test import support
3
4import errno
5import io
6import itertools
7import socket
8import select
9import tempfile
10import time
11import traceback
12import queue
13import sys
14import os
15import array
16import contextlib
17from weakref import proxy
18import signal
19import math
20import pickle
21import struct
22import random
23import string
24try:
25    import multiprocessing
26except ImportError:
27    multiprocessing = False
28try:
29    import fcntl
30except ImportError:
31    fcntl = None
32
33HOST = support.HOST
34MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return
35MAIN_TIMEOUT = 60.0
36
37try:
38    import _thread as thread
39    import threading
40except ImportError:
41    thread = None
42    threading = None
43try:
44    import _socket
45except ImportError:
46    _socket = None
47
48
49def _have_socket_can():
50    """Check whether CAN sockets are supported on this host."""
51    try:
52        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
53    except (AttributeError, OSError):
54        return False
55    else:
56        s.close()
57    return True
58
59def _have_socket_rds():
60    """Check whether RDS sockets are supported on this host."""
61    try:
62        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
63    except (AttributeError, OSError):
64        return False
65    else:
66        s.close()
67    return True
68
69def _have_socket_alg():
70    """Check whether AF_ALG sockets are supported on this host."""
71    try:
72        s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
73    except (AttributeError, OSError):
74        return False
75    else:
76        s.close()
77    return True
78
79HAVE_SOCKET_CAN = _have_socket_can()
80
81HAVE_SOCKET_RDS = _have_socket_rds()
82
83HAVE_SOCKET_ALG = _have_socket_alg()
84
85# Size in bytes of the int type
86SIZEOF_INT = array.array("i").itemsize
87
88class SocketTCPTest(unittest.TestCase):
89
90    def setUp(self):
91        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
92        self.port = support.bind_port(self.serv)
93        self.serv.listen()
94
95    def tearDown(self):
96        self.serv.close()
97        self.serv = None
98
99class SocketUDPTest(unittest.TestCase):
100
101    def setUp(self):
102        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
103        self.port = support.bind_port(self.serv)
104
105    def tearDown(self):
106        self.serv.close()
107        self.serv = None
108
109class ThreadSafeCleanupTestCase(unittest.TestCase):
110    """Subclass of unittest.TestCase with thread-safe cleanup methods.
111
112    This subclass protects the addCleanup() and doCleanups() methods
113    with a recursive lock.
114    """
115
116    if threading:
117        def __init__(self, *args, **kwargs):
118            super().__init__(*args, **kwargs)
119            self._cleanup_lock = threading.RLock()
120
121        def addCleanup(self, *args, **kwargs):
122            with self._cleanup_lock:
123                return super().addCleanup(*args, **kwargs)
124
125        def doCleanups(self, *args, **kwargs):
126            with self._cleanup_lock:
127                return super().doCleanups(*args, **kwargs)
128
129class SocketCANTest(unittest.TestCase):
130
131    """To be able to run this test, a `vcan0` CAN interface can be created with
132    the following commands:
133    # modprobe vcan
134    # ip link add dev vcan0 type vcan
135    # ifconfig vcan0 up
136    """
137    interface = 'vcan0'
138    bufsize = 128
139
140    """The CAN frame structure is defined in <linux/can.h>:
141
142    struct can_frame {
143        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
144        __u8    can_dlc; /* data length code: 0 .. 8 */
145        __u8    data[8] __attribute__((aligned(8)));
146    };
147    """
148    can_frame_fmt = "=IB3x8s"
149    can_frame_size = struct.calcsize(can_frame_fmt)
150
151    """The Broadcast Management Command frame structure is defined
152    in <linux/can/bcm.h>:
153
154    struct bcm_msg_head {
155        __u32 opcode;
156        __u32 flags;
157        __u32 count;
158        struct timeval ival1, ival2;
159        canid_t can_id;
160        __u32 nframes;
161        struct can_frame frames[0];
162    }
163
164    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
165    `struct can_frame` definition). Must use native not standard types for packing.
166    """
167    bcm_cmd_msg_fmt = "@3I4l2I"
168    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
169
170    def setUp(self):
171        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
172        self.addCleanup(self.s.close)
173        try:
174            self.s.bind((self.interface,))
175        except OSError:
176            self.skipTest('network interface `%s` does not exist' %
177                           self.interface)
178
179
180class SocketRDSTest(unittest.TestCase):
181
182    """To be able to run this test, the `rds` kernel module must be loaded:
183    # modprobe rds
184    """
185    bufsize = 8192
186
187    def setUp(self):
188        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
189        self.addCleanup(self.serv.close)
190        try:
191            self.port = support.bind_port(self.serv)
192        except OSError:
193            self.skipTest('unable to bind RDS socket')
194
195
196class ThreadableTest:
197    """Threadable Test class
198
199    The ThreadableTest class makes it easy to create a threaded
200    client/server pair from an existing unit test. To create a
201    new threaded class from an existing unit test, use multiple
202    inheritance:
203
204        class NewClass (OldClass, ThreadableTest):
205            pass
206
207    This class defines two new fixture functions with obvious
208    purposes for overriding:
209
210        clientSetUp ()
211        clientTearDown ()
212
213    Any new test functions within the class must then define
214    tests in pairs, where the test name is preceded with a
215    '_' to indicate the client portion of the test. Ex:
216
217        def testFoo(self):
218            # Server portion
219
220        def _testFoo(self):
221            # Client portion
222
223    Any exceptions raised by the clients during their tests
224    are caught and transferred to the main thread to alert
225    the testing framework.
226
227    Note, the server setup function cannot call any blocking
228    functions that rely on the client thread during setup,
229    unless serverExplicitReady() is called just before
230    the blocking call (such as in setting up a client/server
231    connection and performing the accept() in setUp().
232    """
233
234    def __init__(self):
235        # Swap the true setup function
236        self.__setUp = self.setUp
237        self.__tearDown = self.tearDown
238        self.setUp = self._setUp
239        self.tearDown = self._tearDown
240
241    def serverExplicitReady(self):
242        """This method allows the server to explicitly indicate that
243        it wants the client thread to proceed. This is useful if the
244        server is about to execute a blocking routine that is
245        dependent upon the client thread during its setup routine."""
246        self.server_ready.set()
247
248    def _setUp(self):
249        self.wait_threads = support.wait_threads_exit()
250        self.wait_threads.__enter__()
251
252        self.server_ready = threading.Event()
253        self.client_ready = threading.Event()
254        self.done = threading.Event()
255        self.queue = queue.Queue(1)
256        self.server_crashed = False
257
258        # Do some munging to start the client test.
259        methodname = self.id()
260        i = methodname.rfind('.')
261        methodname = methodname[i+1:]
262        test_method = getattr(self, '_' + methodname)
263        self.client_thread = thread.start_new_thread(
264            self.clientRun, (test_method,))
265
266        try:
267            self.__setUp()
268        except:
269            self.server_crashed = True
270            raise
271        finally:
272            self.server_ready.set()
273        self.client_ready.wait()
274
275    def _tearDown(self):
276        self.__tearDown()
277        self.done.wait()
278        self.wait_threads.__exit__(None, None, None)
279
280        if self.queue.qsize():
281            exc = self.queue.get()
282            raise exc
283
284    def clientRun(self, test_func):
285        self.server_ready.wait()
286        try:
287            self.clientSetUp()
288        except BaseException as e:
289            self.queue.put(e)
290            self.clientTearDown()
291            return
292        finally:
293            self.client_ready.set()
294        if self.server_crashed:
295            self.clientTearDown()
296            return
297        if not hasattr(test_func, '__call__'):
298            raise TypeError("test_func must be a callable function")
299        try:
300            test_func()
301        except BaseException as e:
302            self.queue.put(e)
303        finally:
304            self.clientTearDown()
305
306    def clientSetUp(self):
307        raise NotImplementedError("clientSetUp must be implemented.")
308
309    def clientTearDown(self):
310        self.done.set()
311        thread.exit()
312
313class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
314
315    def __init__(self, methodName='runTest'):
316        SocketTCPTest.__init__(self, methodName=methodName)
317        ThreadableTest.__init__(self)
318
319    def clientSetUp(self):
320        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
321
322    def clientTearDown(self):
323        self.cli.close()
324        self.cli = None
325        ThreadableTest.clientTearDown(self)
326
327class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
328
329    def __init__(self, methodName='runTest'):
330        SocketUDPTest.__init__(self, methodName=methodName)
331        ThreadableTest.__init__(self)
332
333    def clientSetUp(self):
334        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
335
336    def clientTearDown(self):
337        self.cli.close()
338        self.cli = None
339        ThreadableTest.clientTearDown(self)
340
341class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
342
343    def __init__(self, methodName='runTest'):
344        SocketCANTest.__init__(self, methodName=methodName)
345        ThreadableTest.__init__(self)
346
347    def clientSetUp(self):
348        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
349        try:
350            self.cli.bind((self.interface,))
351        except OSError:
352            # skipTest should not be called here, and will be called in the
353            # server instead
354            pass
355
356    def clientTearDown(self):
357        self.cli.close()
358        self.cli = None
359        ThreadableTest.clientTearDown(self)
360
361class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
362
363    def __init__(self, methodName='runTest'):
364        SocketRDSTest.__init__(self, methodName=methodName)
365        ThreadableTest.__init__(self)
366
367    def clientSetUp(self):
368        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
369        try:
370            # RDS sockets must be bound explicitly to send or receive data
371            self.cli.bind((HOST, 0))
372            self.cli_addr = self.cli.getsockname()
373        except OSError:
374            # skipTest should not be called here, and will be called in the
375            # server instead
376            pass
377
378    def clientTearDown(self):
379        self.cli.close()
380        self.cli = None
381        ThreadableTest.clientTearDown(self)
382
383class SocketConnectedTest(ThreadedTCPSocketTest):
384    """Socket tests for client-server connection.
385
386    self.cli_conn is a client socket connected to the server.  The
387    setUp() method guarantees that it is connected to the server.
388    """
389
390    def __init__(self, methodName='runTest'):
391        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
392
393    def setUp(self):
394        ThreadedTCPSocketTest.setUp(self)
395        # Indicate explicitly we're ready for the client thread to
396        # proceed and then perform the blocking call to accept
397        self.serverExplicitReady()
398        conn, addr = self.serv.accept()
399        self.cli_conn = conn
400
401    def tearDown(self):
402        self.cli_conn.close()
403        self.cli_conn = None
404        ThreadedTCPSocketTest.tearDown(self)
405
406    def clientSetUp(self):
407        ThreadedTCPSocketTest.clientSetUp(self)
408        self.cli.connect((HOST, self.port))
409        self.serv_conn = self.cli
410
411    def clientTearDown(self):
412        self.serv_conn.close()
413        self.serv_conn = None
414        ThreadedTCPSocketTest.clientTearDown(self)
415
416class SocketPairTest(unittest.TestCase, ThreadableTest):
417
418    def __init__(self, methodName='runTest'):
419        unittest.TestCase.__init__(self, methodName=methodName)
420        ThreadableTest.__init__(self)
421
422    def setUp(self):
423        self.serv, self.cli = socket.socketpair()
424
425    def tearDown(self):
426        self.serv.close()
427        self.serv = None
428
429    def clientSetUp(self):
430        pass
431
432    def clientTearDown(self):
433        self.cli.close()
434        self.cli = None
435        ThreadableTest.clientTearDown(self)
436
437
438# The following classes are used by the sendmsg()/recvmsg() tests.
439# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
440# gives a drop-in replacement for SocketConnectedTest, but different
441# address families can be used, and the attributes serv_addr and
442# cli_addr will be set to the addresses of the endpoints.
443
444class SocketTestBase(unittest.TestCase):
445    """A base class for socket tests.
446
447    Subclasses must provide methods newSocket() to return a new socket
448    and bindSock(sock) to bind it to an unused address.
449
450    Creates a socket self.serv and sets self.serv_addr to its address.
451    """
452
453    def setUp(self):
454        self.serv = self.newSocket()
455        self.bindServer()
456
457    def bindServer(self):
458        """Bind server socket and set self.serv_addr to its address."""
459        self.bindSock(self.serv)
460        self.serv_addr = self.serv.getsockname()
461
462    def tearDown(self):
463        self.serv.close()
464        self.serv = None
465
466
467class SocketListeningTestMixin(SocketTestBase):
468    """Mixin to listen on the server socket."""
469
470    def setUp(self):
471        super().setUp()
472        self.serv.listen()
473
474
475class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
476                              ThreadableTest):
477    """Mixin to add client socket and allow client/server tests.
478
479    Client socket is self.cli and its address is self.cli_addr.  See
480    ThreadableTest for usage information.
481    """
482
483    def __init__(self, *args, **kwargs):
484        super().__init__(*args, **kwargs)
485        ThreadableTest.__init__(self)
486
487    def clientSetUp(self):
488        self.cli = self.newClientSocket()
489        self.bindClient()
490
491    def newClientSocket(self):
492        """Return a new socket for use as client."""
493        return self.newSocket()
494
495    def bindClient(self):
496        """Bind client socket and set self.cli_addr to its address."""
497        self.bindSock(self.cli)
498        self.cli_addr = self.cli.getsockname()
499
500    def clientTearDown(self):
501        self.cli.close()
502        self.cli = None
503        ThreadableTest.clientTearDown(self)
504
505
506class ConnectedStreamTestMixin(SocketListeningTestMixin,
507                               ThreadedSocketTestMixin):
508    """Mixin to allow client/server stream tests with connected client.
509
510    Server's socket representing connection to client is self.cli_conn
511    and client's connection to server is self.serv_conn.  (Based on
512    SocketConnectedTest.)
513    """
514
515    def setUp(self):
516        super().setUp()
517        # Indicate explicitly we're ready for the client thread to
518        # proceed and then perform the blocking call to accept
519        self.serverExplicitReady()
520        conn, addr = self.serv.accept()
521        self.cli_conn = conn
522
523    def tearDown(self):
524        self.cli_conn.close()
525        self.cli_conn = None
526        super().tearDown()
527
528    def clientSetUp(self):
529        super().clientSetUp()
530        self.cli.connect(self.serv_addr)
531        self.serv_conn = self.cli
532
533    def clientTearDown(self):
534        try:
535            self.serv_conn.close()
536            self.serv_conn = None
537        except AttributeError:
538            pass
539        super().clientTearDown()
540
541
542class UnixSocketTestBase(SocketTestBase):
543    """Base class for Unix-domain socket tests."""
544
545    # This class is used for file descriptor passing tests, so we
546    # create the sockets in a private directory so that other users
547    # can't send anything that might be problematic for a privileged
548    # user running the tests.
549
550    def setUp(self):
551        self.dir_path = tempfile.mkdtemp()
552        self.addCleanup(os.rmdir, self.dir_path)
553        super().setUp()
554
555    def bindSock(self, sock):
556        path = tempfile.mktemp(dir=self.dir_path)
557        support.bind_unix_socket(sock, path)
558        self.addCleanup(support.unlink, path)
559
560class UnixStreamBase(UnixSocketTestBase):
561    """Base class for Unix-domain SOCK_STREAM tests."""
562
563    def newSocket(self):
564        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
565
566
567class InetTestBase(SocketTestBase):
568    """Base class for IPv4 socket tests."""
569
570    host = HOST
571
572    def setUp(self):
573        super().setUp()
574        self.port = self.serv_addr[1]
575
576    def bindSock(self, sock):
577        support.bind_port(sock, host=self.host)
578
579class TCPTestBase(InetTestBase):
580    """Base class for TCP-over-IPv4 tests."""
581
582    def newSocket(self):
583        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
584
585class UDPTestBase(InetTestBase):
586    """Base class for UDP-over-IPv4 tests."""
587
588    def newSocket(self):
589        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
590
591class SCTPStreamBase(InetTestBase):
592    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
593
594    def newSocket(self):
595        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
596                             socket.IPPROTO_SCTP)
597
598
599class Inet6TestBase(InetTestBase):
600    """Base class for IPv6 socket tests."""
601
602    host = support.HOSTv6
603
604class UDP6TestBase(Inet6TestBase):
605    """Base class for UDP-over-IPv6 tests."""
606
607    def newSocket(self):
608        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
609
610
611# Test-skipping decorators for use with ThreadableTest.
612
613def skipWithClientIf(condition, reason):
614    """Skip decorated test if condition is true, add client_skip decorator.
615
616    If the decorated object is not a class, sets its attribute
617    "client_skip" to a decorator which will return an empty function
618    if the test is to be skipped, or the original function if it is
619    not.  This can be used to avoid running the client part of a
620    skipped test when using ThreadableTest.
621    """
622    def client_pass(*args, **kwargs):
623        pass
624    def skipdec(obj):
625        retval = unittest.skip(reason)(obj)
626        if not isinstance(obj, type):
627            retval.client_skip = lambda f: client_pass
628        return retval
629    def noskipdec(obj):
630        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
631            obj.client_skip = lambda f: f
632        return obj
633    return skipdec if condition else noskipdec
634
635
636def requireAttrs(obj, *attributes):
637    """Skip decorated test if obj is missing any of the given attributes.
638
639    Sets client_skip attribute as skipWithClientIf() does.
640    """
641    missing = [name for name in attributes if not hasattr(obj, name)]
642    return skipWithClientIf(
643        missing, "don't have " + ", ".join(name for name in missing))
644
645
646def requireSocket(*args):
647    """Skip decorated test if a socket cannot be created with given arguments.
648
649    When an argument is given as a string, will use the value of that
650    attribute of the socket module, or skip the test if it doesn't
651    exist.  Sets client_skip attribute as skipWithClientIf() does.
652    """
653    err = None
654    missing = [obj for obj in args if
655               isinstance(obj, str) and not hasattr(socket, obj)]
656    if missing:
657        err = "don't have " + ", ".join(name for name in missing)
658    else:
659        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
660                    for obj in args]
661        try:
662            s = socket.socket(*callargs)
663        except OSError as e:
664            # XXX: check errno?
665            err = str(e)
666        else:
667            s.close()
668    return skipWithClientIf(
669        err is not None,
670        "can't create socket({0}): {1}".format(
671            ", ".join(str(o) for o in args), err))
672
673
674#######################################################################
675## Begin Tests
676
677class GeneralModuleTests(unittest.TestCase):
678
679    def test_SocketType_is_socketobject(self):
680        import _socket
681        self.assertTrue(socket.SocketType is _socket.socket)
682        s = socket.socket()
683        self.assertIsInstance(s, socket.SocketType)
684        s.close()
685
686    def test_repr(self):
687        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
688        with s:
689            self.assertIn('fd=%i' % s.fileno(), repr(s))
690            self.assertIn('family=%s' % socket.AF_INET, repr(s))
691            self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
692            self.assertIn('proto=0', repr(s))
693            self.assertNotIn('raddr', repr(s))
694            s.bind(('127.0.0.1', 0))
695            self.assertIn('laddr', repr(s))
696            self.assertIn(str(s.getsockname()), repr(s))
697        self.assertIn('[closed]', repr(s))
698        self.assertNotIn('laddr', repr(s))
699
700    @unittest.skipUnless(_socket is not None, 'need _socket module')
701    def test_csocket_repr(self):
702        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
703        try:
704            expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
705                        % (s.fileno(), s.family, s.type, s.proto))
706            self.assertEqual(repr(s), expected)
707        finally:
708            s.close()
709        expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
710                    % (s.family, s.type, s.proto))
711        self.assertEqual(repr(s), expected)
712
713    def test_weakref(self):
714        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
715        p = proxy(s)
716        self.assertEqual(p.fileno(), s.fileno())
717        s.close()
718        s = None
719        support.gc_collect()
720        try:
721            p.fileno()
722        except ReferenceError:
723            pass
724        else:
725            self.fail('Socket proxy still exists')
726
727    def testSocketError(self):
728        # Testing socket module exceptions
729        msg = "Error raising socket exception (%s)."
730        with self.assertRaises(OSError, msg=msg % 'OSError'):
731            raise OSError
732        with self.assertRaises(OSError, msg=msg % 'socket.herror'):
733            raise socket.herror
734        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
735            raise socket.gaierror
736
737    def testSendtoErrors(self):
738        # Testing that sendto doesn't mask failures. See #10169.
739        # PyPy note: made the test accept broader messages: PyPy's
740        # messages are equivalent but worded differently.
741        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
742        self.addCleanup(s.close)
743        s.bind(('', 0))
744        sockname = s.getsockname()
745        # 2 args
746        with self.assertRaises(TypeError) as cm:
747            s.sendto('\u2620', sockname)
748        self.assertIn(str(cm.exception),
749                      ["a bytes-like object is required, not 'str'", # cpython
750                       "a bytes-like object is required, not str"]) # pypy
751        with self.assertRaises(TypeError) as cm:
752            s.sendto(5j, sockname)
753        self.assertIn(str(cm.exception),
754                      ["a bytes-like object is required, not 'complex'",
755                       "a bytes-like object is required, not complex"])
756        with self.assertRaises(TypeError) as cm:
757            s.sendto(b'foo', None)
758        self.assertIn('NoneType', str(cm.exception))
759        # 3 args
760        with self.assertRaises(TypeError) as cm:
761            s.sendto('\u2620', 0, sockname)
762        self.assertIn(str(cm.exception),
763                      ["a bytes-like object is required, not 'str'",
764                       "a bytes-like object is required, not str"])
765        with self.assertRaises(TypeError) as cm:
766            s.sendto(5j, 0, sockname)
767        self.assertIn(str(cm.exception),
768                      ["a bytes-like object is required, not 'complex'",
769                       "a bytes-like object is required, not complex"])
770        with self.assertRaises(TypeError) as cm:
771            s.sendto(b'foo', 0, None)
772        self.assertIn('NoneType', str(cm.exception))
773        with self.assertRaises(TypeError) as cm:
774            s.sendto(b'foo', 'bar', sockname)
775        self.assertIn('integer', str(cm.exception))
776        with self.assertRaises(TypeError) as cm:
777            s.sendto(b'foo', None, None)
778        self.assertIn('integer', str(cm.exception))
779        # wrong number of args
780        with self.assertRaises(TypeError) as cm:
781            s.sendto(b'foo')
782        if support.check_impl_detail():
783            self.assertIn(' given)', str(cm.exception))
784        with self.assertRaises(TypeError) as cm:
785            s.sendto(b'foo', 0, sockname, 4)
786        self.assertIn(' given', str(cm.exception))
787
788    def testCrucialConstants(self):
789        # Testing for mission critical constants
790        socket.AF_INET
791        socket.SOCK_STREAM
792        socket.SOCK_DGRAM
793        socket.SOCK_RAW
794        socket.SOCK_RDM
795        socket.SOCK_SEQPACKET
796        socket.SOL_SOCKET
797        socket.SO_REUSEADDR
798
799    def testHostnameRes(self):
800        # Testing hostname resolution mechanisms
801        hostname = socket.gethostname()
802        try:
803            ip = socket.gethostbyname(hostname)
804        except OSError:
805            # Probably name lookup wasn't set up right; skip this test
806            self.skipTest('name lookup failure')
807        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
808        try:
809            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
810        except OSError:
811            # Probably a similar problem as above; skip this test
812            self.skipTest('name lookup failure')
813        all_host_names = [hostname, hname] + aliases
814        fqhn = socket.getfqdn(ip)
815        if not fqhn in all_host_names:
816            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
817
818    def test_host_resolution(self):
819        for addr in [support.HOST, '10.0.0.1', '255.255.255.255']:
820            self.assertEqual(socket.gethostbyname(addr), addr)
821
822        # we don't test support.HOSTv6 because there's a chance it doesn't have
823        # a matching name entry (e.g. 'ip6-localhost')
824        for host in [support.HOST]:
825            self.assertIn(host, socket.gethostbyaddr(host)[2])
826
827    def test_host_resolution_bad_address(self):
828        # These are all malformed IP addresses and expected not to resolve to
829        # any result.  But some ISPs, e.g. AWS, may successfully resolve these
830        # IPs.
831        explanation = (
832            "resolving an invalid IP address did not raise OSError; "
833            "can be caused by a broken DNS server"
834        )
835        for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
836                     '1:1:1:1:1:1:1:1:1']:
837            with self.assertRaises(OSError):
838                socket.gethostbyname(addr)
839            with self.assertRaises(OSError, msg=explanation):
840                socket.gethostbyaddr(addr)
841
842    @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
843    @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
844    def test_sethostname(self):
845        oldhn = socket.gethostname()
846        try:
847            socket.sethostname('new')
848        except OSError as e:
849            if e.errno == errno.EPERM:
850                self.skipTest("test should be run as root")
851            else:
852                raise
853        try:
854            # running test as root!
855            self.assertEqual(socket.gethostname(), 'new')
856            # Should work with bytes objects too
857            socket.sethostname(b'bar')
858            self.assertEqual(socket.gethostname(), 'bar')
859        finally:
860            socket.sethostname(oldhn)
861
862    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
863                         'socket.if_nameindex() not available.')
864    def testInterfaceNameIndex(self):
865        interfaces = socket.if_nameindex()
866        for index, name in interfaces:
867            self.assertIsInstance(index, int)
868            self.assertIsInstance(name, str)
869            # interface indices are non-zero integers
870            self.assertGreater(index, 0)
871            _index = socket.if_nametoindex(name)
872            self.assertIsInstance(_index, int)
873            self.assertEqual(index, _index)
874            _name = socket.if_indextoname(index)
875            self.assertIsInstance(_name, str)
876            self.assertEqual(name, _name)
877
878    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
879                         'socket.if_nameindex() not available.')
880    def testInvalidInterfaceNameIndex(self):
881        # test nonexistent interface index/name
882        self.assertRaises(OSError, socket.if_indextoname, 0)
883        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
884        # test with invalid values
885        self.assertRaises(TypeError, socket.if_nametoindex, 0)
886        self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
887
888    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
889                         'test needs sys.getrefcount()')
890    def testRefCountGetNameInfo(self):
891        # Testing reference count for getnameinfo
892        try:
893            # On some versions, this loses a reference
894            orig = sys.getrefcount(__name__)
895            socket.getnameinfo(__name__,0)
896        except TypeError:
897            if sys.getrefcount(__name__) != orig:
898                self.fail("socket.getnameinfo loses a reference")
899
900    def testInterpreterCrash(self):
901        # Making sure getnameinfo doesn't crash the interpreter
902        try:
903            # On some versions, this crashes the interpreter.
904            socket.getnameinfo(('x', 0, 0, 0), 0)
905        except OSError:
906            pass
907
908    def testNtoH(self):
909        # This just checks that htons etc. are their own inverse,
910        # when looking at the lower 16 or 32 bits.
911        sizes = {socket.htonl: 32, socket.ntohl: 32,
912                 socket.htons: 16, socket.ntohs: 16}
913        for func, size in sizes.items():
914            mask = (1<<size) - 1
915            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
916                self.assertEqual(i & mask, func(func(i&mask)) & mask)
917
918            swapped = func(mask)
919            self.assertEqual(swapped & mask, mask)
920            self.assertRaises(OverflowError, func, 1<<34)
921
922    @support.cpython_only
923    def testNtoHErrors(self):
924        good_values = [ 1, 2, 3, 1, 2, 3 ]
925        bad_values = [ -1, -2, -3, -1, -2, -3 ]
926        for k in good_values:
927            socket.ntohl(k)
928            socket.ntohs(k)
929            socket.htonl(k)
930            socket.htons(k)
931        for k in bad_values:
932            self.assertRaises(OverflowError, socket.ntohl, k)
933            self.assertRaises(OverflowError, socket.ntohs, k)
934            self.assertRaises(OverflowError, socket.htonl, k)
935            self.assertRaises(OverflowError, socket.htons, k)
936
937    def testGetServBy(self):
938        eq = self.assertEqual
939        # Find one service that exists, then check all the related interfaces.
940        # I've ordered this by protocols that have both a tcp and udp
941        # protocol, at least for modern Linuxes.
942        if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
943            or sys.platform in ('linux', 'darwin')):
944            # avoid the 'echo' service on this platform, as there is an
945            # assumption breaking non-standard port/protocol entry
946            services = ('daytime', 'qotd', 'domain')
947        else:
948            services = ('echo', 'daytime', 'domain')
949        for service in services:
950            try:
951                port = socket.getservbyname(service, 'tcp')
952                break
953            except OSError:
954                pass
955        else:
956            raise OSError
957        # Try same call with optional protocol omitted
958        port2 = socket.getservbyname(service)
959        eq(port, port2)
960        # Try udp, but don't barf if it doesn't exist
961        try:
962            udpport = socket.getservbyname(service, 'udp')
963        except OSError:
964            udpport = None
965        else:
966            eq(udpport, port)
967        # Now make sure the lookup by port returns the same service name
968        eq(socket.getservbyport(port2), service)
969        eq(socket.getservbyport(port, 'tcp'), service)
970        if udpport is not None:
971            eq(socket.getservbyport(udpport, 'udp'), service)
972        # Make sure getservbyport does not accept out of range ports.
973        self.assertRaises(OverflowError, socket.getservbyport, -1)
974        self.assertRaises(OverflowError, socket.getservbyport, 65536)
975
976    def testDefaultTimeout(self):
977        # Testing default timeout
978        # The default timeout should initially be None
979        self.assertEqual(socket.getdefaulttimeout(), None)
980        s = socket.socket()
981        self.assertEqual(s.gettimeout(), None)
982        s.close()
983
984        # Set the default timeout to 10, and see if it propagates
985        socket.setdefaulttimeout(10)
986        self.assertEqual(socket.getdefaulttimeout(), 10)
987        s = socket.socket()
988        self.assertEqual(s.gettimeout(), 10)
989        s.close()
990
991        # Reset the default timeout to None, and see if it propagates
992        socket.setdefaulttimeout(None)
993        self.assertEqual(socket.getdefaulttimeout(), None)
994        s = socket.socket()
995        self.assertEqual(s.gettimeout(), None)
996        s.close()
997
998        # Check that setting it to an invalid value raises ValueError
999        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
1000
1001        # Check that setting it to an invalid type raises TypeError
1002        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
1003
1004    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
1005                         'test needs socket.inet_aton()')
1006    def testIPv4_inet_aton_fourbytes(self):
1007        # Test that issue1008086 and issue767150 are fixed.
1008        # It must return 4 bytes.
1009        self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
1010        self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
1011
1012    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1013                         'test needs socket.inet_pton()')
1014    def testIPv4toString(self):
1015        from socket import inet_aton as f, inet_pton, AF_INET
1016        g = lambda a: inet_pton(AF_INET, a)
1017
1018        assertInvalid = lambda func,a: self.assertRaises(
1019            (OSError, ValueError), func, a
1020        )
1021
1022        self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
1023        self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1024        self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1025        self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1026        self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1027        assertInvalid(f, '0.0.0.')
1028        assertInvalid(f, '300.0.0.0')
1029        assertInvalid(f, 'a.0.0.0')
1030        assertInvalid(f, '1.2.3.4.5')
1031        assertInvalid(f, '::1')
1032
1033        self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1034        self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1035        self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1036        self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1037        assertInvalid(g, '0.0.0.')
1038        assertInvalid(g, '300.0.0.0')
1039        assertInvalid(g, 'a.0.0.0')
1040        assertInvalid(g, '1.2.3.4.5')
1041        assertInvalid(g, '::1')
1042
1043    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1044                         'test needs socket.inet_pton()')
1045    def testIPv6toString(self):
1046        try:
1047            from socket import inet_pton, AF_INET6, has_ipv6
1048            if not has_ipv6:
1049                self.skipTest('IPv6 not available')
1050        except ImportError:
1051            self.skipTest('could not import needed symbols from socket')
1052
1053        if sys.platform == "win32":
1054            try:
1055                inet_pton(AF_INET6, '::')
1056            except OSError as e:
1057                if e.winerror == 10022:
1058                    self.skipTest('IPv6 might not be supported')
1059
1060        f = lambda a: inet_pton(AF_INET6, a)
1061        assertInvalid = lambda a: self.assertRaises(
1062            (OSError, ValueError), f, a
1063        )
1064
1065        self.assertEqual(b'\x00' * 16, f('::'))
1066        self.assertEqual(b'\x00' * 16, f('0::0'))
1067        self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1068        self.assertEqual(
1069            b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1070            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1071        )
1072        self.assertEqual(
1073            b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1074            f('ad42:abc::127:0:254:2')
1075        )
1076        self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1077        assertInvalid('0x20::')
1078        assertInvalid(':::')
1079        assertInvalid('::0::')
1080        assertInvalid('1::abc::')
1081        assertInvalid('1::abc::def')
1082        assertInvalid('1:2:3:4:5:6:')
1083        assertInvalid('1:2:3:4:5:6')
1084        assertInvalid('1:2:3:4:5:6:7:8:')
1085        assertInvalid('1:2:3:4:5:6:7:8:0')
1086
1087        self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1088            f('::254.42.23.64')
1089        )
1090        self.assertEqual(
1091            b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1092            f('42::a29b:254.42.23.64')
1093        )
1094        self.assertEqual(
1095            b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1096            f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1097        )
1098        assertInvalid('255.254.253.252')
1099        assertInvalid('1::260.2.3.0')
1100        assertInvalid('1::0.be.e.0')
1101        assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1102        assertInvalid('::1.2.3.4:0')
1103        assertInvalid('0.100.200.0:3:4:5:6:7:8')
1104
1105    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1106                         'test needs socket.inet_ntop()')
1107    def testStringToIPv4(self):
1108        from socket import inet_ntoa as f, inet_ntop, AF_INET
1109        g = lambda a: inet_ntop(AF_INET, a)
1110        assertInvalid = lambda func,a: self.assertRaises(
1111            (OSError, ValueError), func, a
1112        )
1113
1114        self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1115        self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1116        self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1117        self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1118        assertInvalid(f, b'\x00' * 3)
1119        assertInvalid(f, b'\x00' * 5)
1120        assertInvalid(f, b'\x00' * 16)
1121        self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1122
1123        self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1124        self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1125        self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1126        assertInvalid(g, b'\x00' * 3)
1127        assertInvalid(g, b'\x00' * 5)
1128        assertInvalid(g, b'\x00' * 16)
1129        self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1130
1131    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1132                         'test needs socket.inet_ntop()')
1133    def testStringToIPv6(self):
1134        try:
1135            from socket import inet_ntop, AF_INET6, has_ipv6
1136            if not has_ipv6:
1137                self.skipTest('IPv6 not available')
1138        except ImportError:
1139            self.skipTest('could not import needed symbols from socket')
1140
1141        if sys.platform == "win32":
1142            try:
1143                inet_ntop(AF_INET6, b'\x00' * 16)
1144            except OSError as e:
1145                if e.winerror == 10022:
1146                    self.skipTest('IPv6 might not be supported')
1147
1148        f = lambda a: inet_ntop(AF_INET6, a)
1149        assertInvalid = lambda a: self.assertRaises(
1150            (OSError, ValueError), f, a
1151        )
1152
1153        self.assertEqual('::', f(b'\x00' * 16))
1154        self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1155        self.assertEqual(
1156            'aef:b01:506:1001:ffff:9997:55:170',
1157            f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1158        )
1159        self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1160
1161        assertInvalid(b'\x12' * 15)
1162        assertInvalid(b'\x12' * 17)
1163        assertInvalid(b'\x12' * 4)
1164
1165    # XXX The following don't test module-level functionality...
1166
1167    def testSockName(self):
1168        # Testing getsockname()
1169        port = support.find_unused_port()
1170        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1171        self.addCleanup(sock.close)
1172        sock.bind(("0.0.0.0", port))
1173        name = sock.getsockname()
1174        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1175        # it reasonable to get the host's addr in addition to 0.0.0.0.
1176        # At least for eCos.  This is required for the S/390 to pass.
1177        try:
1178            my_ip_addr = socket.gethostbyname(socket.gethostname())
1179        except OSError:
1180            # Probably name lookup wasn't set up right; skip this test
1181            self.skipTest('name lookup failure')
1182        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1183        self.assertEqual(name[1], port)
1184
1185    def testGetSockOpt(self):
1186        # Testing getsockopt()
1187        # We know a socket should start without reuse==0
1188        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1189        self.addCleanup(sock.close)
1190        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1191        self.assertFalse(reuse != 0, "initial mode is reuse")
1192
1193    def testSetSockOpt(self):
1194        # Testing setsockopt()
1195        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1196        self.addCleanup(sock.close)
1197        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1198        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1199        self.assertFalse(reuse == 0, "failed to set reuse mode")
1200
1201    def testSendAfterClose(self):
1202        # testing send() after close() with timeout
1203        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1204        sock.settimeout(1)
1205        sock.close()
1206        self.assertRaises(OSError, sock.send, b"spam")
1207
1208    def testCloseException(self):
1209        sock = socket.socket()
1210        socket.socket(fileno=sock.fileno()).close()
1211        try:
1212            sock.close()
1213        except OSError as err:
1214            # Winsock apparently raises ENOTSOCK
1215            self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1216        else:
1217            self.fail("close() should raise EBADF/ENOTSOCK")
1218
1219    def testNewAttributes(self):
1220        # testing .family, .type and .protocol
1221
1222        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1223        self.assertEqual(sock.family, socket.AF_INET)
1224        if hasattr(socket, 'SOCK_CLOEXEC'):
1225            self.assertIn(sock.type,
1226                          (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1227                           socket.SOCK_STREAM))
1228        else:
1229            self.assertEqual(sock.type, socket.SOCK_STREAM)
1230        self.assertEqual(sock.proto, 0)
1231        sock.close()
1232
1233    def test_getsockaddrarg(self):
1234        sock = socket.socket()
1235        self.addCleanup(sock.close)
1236        port = support.find_unused_port()
1237        big_port = port + 65536
1238        neg_port = port - 65536
1239        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1240        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1241        # Since find_unused_port() is inherently subject to race conditions, we
1242        # call it a couple times if necessary.
1243        for i in itertools.count():
1244            port = support.find_unused_port()
1245            try:
1246                sock.bind((HOST, port))
1247            except OSError as e:
1248                if e.errno != errno.EADDRINUSE or i == 5:
1249                    raise
1250            else:
1251                break
1252
1253    @unittest.skipUnless(os.name == "nt", "Windows specific")
1254    def test_sock_ioctl(self):
1255        self.assertTrue(hasattr(socket.socket, 'ioctl'))
1256        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1257        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1258        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1259        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1260        s = socket.socket()
1261        self.addCleanup(s.close)
1262        self.assertRaises(ValueError, s.ioctl, -1, None)
1263        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1264
1265    @unittest.skipUnless(os.name == "nt", "Windows specific")
1266    @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1267                         'Loopback fast path support required for this test')
1268    def test_sio_loopback_fast_path(self):
1269        s = socket.socket()
1270        self.addCleanup(s.close)
1271        try:
1272            s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1273        except OSError as exc:
1274            WSAEOPNOTSUPP = 10045
1275            if exc.winerror == WSAEOPNOTSUPP:
1276                self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1277                              "doesn't implemented in this Windows version")
1278            raise
1279        self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1280
1281    def testGetaddrinfo(self):
1282        try:
1283            socket.getaddrinfo('localhost', 80)
1284        except socket.gaierror as err:
1285            if err.errno == socket.EAI_SERVICE:
1286                # see http://bugs.python.org/issue1282647
1287                self.skipTest("buggy libc version")
1288            raise
1289        # len of every sequence is supposed to be == 5
1290        for info in socket.getaddrinfo(HOST, None):
1291            self.assertEqual(len(info), 5)
1292        # host can be a domain name, a string representation of an
1293        # IPv4/v6 address or None
1294        socket.getaddrinfo('localhost', 80)
1295        socket.getaddrinfo('127.0.0.1', 80)
1296        socket.getaddrinfo(None, 80)
1297        if support.IPV6_ENABLED:
1298            socket.getaddrinfo('::1', 80)
1299        # port can be a string service name such as "http", a numeric
1300        # port number or None
1301        socket.getaddrinfo(HOST, "http")
1302        socket.getaddrinfo(HOST, 80)
1303        socket.getaddrinfo(HOST, None)
1304        # test family and socktype filters
1305        infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1306        for family, type, _, _, _ in infos:
1307            self.assertEqual(family, socket.AF_INET)
1308            self.assertEqual(str(family), 'AddressFamily.AF_INET')
1309            self.assertEqual(type, socket.SOCK_STREAM)
1310            self.assertEqual(str(type), 'SocketKind.SOCK_STREAM')
1311        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1312        for _, socktype, _, _, _ in infos:
1313            self.assertEqual(socktype, socket.SOCK_STREAM)
1314        # test proto and flags arguments
1315        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1316        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1317        # a server willing to support both IPv4 and IPv6 will
1318        # usually do this
1319        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1320                           socket.AI_PASSIVE)
1321        # test keyword arguments
1322        a = socket.getaddrinfo(HOST, None)
1323        b = socket.getaddrinfo(host=HOST, port=None)
1324        self.assertEqual(a, b)
1325        a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1326        b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1327        self.assertEqual(a, b)
1328        a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1329        b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1330        self.assertEqual(a, b)
1331        a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1332        b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1333        self.assertEqual(a, b)
1334        a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1335        b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1336        self.assertEqual(a, b)
1337        a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1338                               socket.AI_PASSIVE)
1339        b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1340                               type=socket.SOCK_STREAM, proto=0,
1341                               flags=socket.AI_PASSIVE)
1342        self.assertEqual(a, b)
1343        # Issue #6697.
1344        self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1345
1346        # Issue 17269: test workaround for OS X platform bug segfault
1347        if hasattr(socket, 'AI_NUMERICSERV'):
1348            try:
1349                # The arguments here are undefined and the call may succeed
1350                # or fail.  All we care here is that it doesn't segfault.
1351                socket.getaddrinfo("localhost", None, 0, 0, 0,
1352                                   socket.AI_NUMERICSERV)
1353            except socket.gaierror:
1354                pass
1355
1356    def test_getnameinfo(self):
1357        # only IP addresses are allowed
1358        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1359
1360    @unittest.skipUnless(support.is_resource_enabled('network'),
1361                         'network is not enabled')
1362    def test_idna(self):
1363        # Check for internet access before running test
1364        # (issue #12804, issue #25138).
1365        with support.transient_internet('python.org'):
1366            socket.gethostbyname('python.org')
1367
1368        # these should all be successful
1369        domain = 'испытание.pythontest.net'
1370        socket.gethostbyname(domain)
1371        socket.gethostbyname_ex(domain)
1372        socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1373        # this may not work if the forward lookup chooses the IPv6 address, as that doesn't
1374        # have a reverse entry yet
1375        # socket.gethostbyaddr('испытание.python.org')
1376
1377    def check_sendall_interrupted(self, with_timeout):
1378        # socketpair() is not strictly required, but it makes things easier.
1379        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1380            self.skipTest("signal.alarm and socket.socketpair required for this test")
1381        # Our signal handlers clobber the C errno by calling a math function
1382        # with an invalid domain value.
1383        def ok_handler(*args):
1384            self.assertRaises(ValueError, math.acosh, 0)
1385        def raising_handler(*args):
1386            self.assertRaises(ValueError, math.acosh, 0)
1387            1 // 0
1388        c, s = socket.socketpair()
1389        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1390        try:
1391            if with_timeout:
1392                # Just above the one second minimum for signal.alarm
1393                c.settimeout(1.5)
1394            with self.assertRaises(ZeroDivisionError):
1395                signal.alarm(1)
1396                c.sendall(b"x" * support.SOCK_MAX_SIZE)
1397            if with_timeout:
1398                signal.signal(signal.SIGALRM, ok_handler)
1399                signal.alarm(1)
1400                self.assertRaises(socket.timeout, c.sendall,
1401                                  b"x" * support.SOCK_MAX_SIZE)
1402        finally:
1403            signal.alarm(0)
1404            signal.signal(signal.SIGALRM, old_alarm)
1405            c.close()
1406            s.close()
1407
1408    def test_sendall_interrupted(self):
1409        self.check_sendall_interrupted(False)
1410
1411    def test_sendall_interrupted_with_timeout(self):
1412        self.check_sendall_interrupted(True)
1413
1414    def test_dealloc_warn(self):
1415        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1416        r = repr(sock)
1417        with self.assertWarns(ResourceWarning) as cm:
1418            sock = None
1419            support.gc_collect()
1420        self.assertIn(r, str(cm.warning.args[0]))
1421        # An open socket file object gets dereferenced after the socket
1422        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1423        f = sock.makefile('rb')
1424        r = repr(sock)
1425        sock = None
1426        support.gc_collect()
1427        with self.assertWarns(ResourceWarning):
1428            f = None
1429            support.gc_collect()
1430
1431    def test_name_closed_socketio(self):
1432        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1433            fp = sock.makefile("rb")
1434            fp.close()
1435            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1436
1437    def test_unusable_closed_socketio(self):
1438        with socket.socket() as sock:
1439            fp = sock.makefile("rb", buffering=0)
1440            self.assertTrue(fp.readable())
1441            self.assertFalse(fp.writable())
1442            self.assertFalse(fp.seekable())
1443            fp.close()
1444            self.assertRaises(ValueError, fp.readable)
1445            self.assertRaises(ValueError, fp.writable)
1446            self.assertRaises(ValueError, fp.seekable)
1447
1448    def test_makefile_mode(self):
1449        for mode in 'r', 'rb', 'rw', 'w', 'wb':
1450            with self.subTest(mode=mode):
1451                with socket.socket() as sock:
1452                    with sock.makefile(mode) as fp:
1453                        self.assertEqual(fp.mode, mode)
1454
1455    def test_makefile_invalid_mode(self):
1456        for mode in 'rt', 'x', '+', 'a':
1457            with self.subTest(mode=mode):
1458                with socket.socket() as sock:
1459                    with self.assertRaisesRegex(ValueError, 'invalid mode'):
1460                        sock.makefile(mode)
1461
1462    def test_pickle(self):
1463        sock = socket.socket()
1464        with sock:
1465            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1466                self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1467        for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1468            family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1469            self.assertEqual(family, socket.AF_INET)
1470            type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1471            self.assertEqual(type, socket.SOCK_STREAM)
1472
1473    def test_listen_backlog(self):
1474        for backlog in 0, -1:
1475            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1476                srv.bind((HOST, 0))
1477                srv.listen(backlog)
1478
1479        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1480            srv.bind((HOST, 0))
1481            srv.listen()
1482
1483    @support.cpython_only
1484    def test_listen_backlog_overflow(self):
1485        # Issue 15989
1486        import _testcapi
1487        srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1488        srv.bind((HOST, 0))
1489        self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1490        srv.close()
1491
1492    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1493    def test_flowinfo(self):
1494        self.assertRaises(OverflowError, socket.getnameinfo,
1495                          (support.HOSTv6, 0, 0xffffffff), 0)
1496        with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1497            self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10))
1498
1499    def test_str_for_enums(self):
1500        # Make sure that the AF_* and SOCK_* constants have enum-like string
1501        # reprs.
1502        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1503            self.assertEqual(str(s.family), 'AddressFamily.AF_INET')
1504            self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM')
1505
1506    @unittest.skipIf(os.name == 'nt', 'Will not work on Windows')
1507    def test_uknown_socket_family_repr(self):
1508        # Test that when created with a family that's not one of the known
1509        # AF_*/SOCK_* constants, socket.family just returns the number.
1510        #
1511        # To do this we fool socket.socket into believing it already has an
1512        # open fd because on this path it doesn't actually verify the family and
1513        # type and populates the socket object.
1514        #
1515        # On Windows this trick won't work, so the test is skipped.
1516        fd, path = tempfile.mkstemp()
1517        self.addCleanup(os.unlink, path)
1518        with socket.socket(family=42424, type=13331, fileno=fd) as s:
1519            self.assertEqual(s.family, 42424)
1520            self.assertEqual(s.type, 13331)
1521
1522    @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1523    def test__sendfile_use_sendfile(self):
1524        class File:
1525            def __init__(self, fd):
1526                self.fd = fd
1527
1528            def fileno(self):
1529                return self.fd
1530        with socket.socket() as sock:
1531            fd = os.open(os.curdir, os.O_RDONLY)
1532            os.close(fd)
1533            with self.assertRaises(socket._GiveupOnSendfile):
1534                sock._sendfile_use_sendfile(File(fd))
1535            with self.assertRaises(OverflowError):
1536                sock._sendfile_use_sendfile(File(2**1000))
1537            with self.assertRaises(TypeError):
1538                sock._sendfile_use_sendfile(File(None))
1539
1540
1541@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1542class BasicCANTest(unittest.TestCase):
1543
1544    def testCrucialConstants(self):
1545        socket.AF_CAN
1546        socket.PF_CAN
1547        socket.CAN_RAW
1548
1549    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1550                         'socket.CAN_BCM required for this test.')
1551    def testBCMConstants(self):
1552        socket.CAN_BCM
1553
1554        # opcodes
1555        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task
1556        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task
1557        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task
1558        socket.CAN_BCM_TX_SEND      # send one CAN frame
1559        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription
1560        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription
1561        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription
1562        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request
1563        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0)
1564        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request
1565        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent
1566        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change)
1567
1568    def testCreateSocket(self):
1569        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1570            pass
1571
1572    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1573                         'socket.CAN_BCM required for this test.')
1574    def testCreateBCMSocket(self):
1575        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
1576            pass
1577
1578    def testBindAny(self):
1579        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1580            s.bind(('', ))
1581
1582    def testTooLongInterfaceName(self):
1583        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
1584        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1585            self.assertRaisesRegex(OSError, 'interface name too long',
1586                                   s.bind, ('x' * 1024,))
1587
1588    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
1589                         'socket.CAN_RAW_LOOPBACK required for this test.')
1590    def testLoopback(self):
1591        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1592            for loopback in (0, 1):
1593                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
1594                             loopback)
1595                self.assertEqual(loopback,
1596                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
1597
1598    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
1599                         'socket.CAN_RAW_FILTER required for this test.')
1600    def testFilter(self):
1601        can_id, can_mask = 0x200, 0x700
1602        can_filter = struct.pack("=II", can_id, can_mask)
1603        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1604            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
1605            self.assertEqual(can_filter,
1606                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
1607            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
1608
1609
1610@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1611@unittest.skipUnless(thread, 'Threading required for this test.')
1612class CANTest(ThreadedCANSocketTest):
1613
1614    def __init__(self, methodName='runTest'):
1615        ThreadedCANSocketTest.__init__(self, methodName=methodName)
1616
1617    @classmethod
1618    def build_can_frame(cls, can_id, data):
1619        """Build a CAN frame."""
1620        can_dlc = len(data)
1621        data = data.ljust(8, b'\x00')
1622        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
1623
1624    @classmethod
1625    def dissect_can_frame(cls, frame):
1626        """Dissect a CAN frame."""
1627        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
1628        return (can_id, can_dlc, data[:can_dlc])
1629
1630    def testSendFrame(self):
1631        cf, addr = self.s.recvfrom(self.bufsize)
1632        self.assertEqual(self.cf, cf)
1633        self.assertEqual(addr[0], self.interface)
1634        self.assertEqual(addr[1], socket.AF_CAN)
1635
1636    def _testSendFrame(self):
1637        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
1638        self.cli.send(self.cf)
1639
1640    def testSendMaxFrame(self):
1641        cf, addr = self.s.recvfrom(self.bufsize)
1642        self.assertEqual(self.cf, cf)
1643
1644    def _testSendMaxFrame(self):
1645        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
1646        self.cli.send(self.cf)
1647
1648    def testSendMultiFrames(self):
1649        cf, addr = self.s.recvfrom(self.bufsize)
1650        self.assertEqual(self.cf1, cf)
1651
1652        cf, addr = self.s.recvfrom(self.bufsize)
1653        self.assertEqual(self.cf2, cf)
1654
1655    def _testSendMultiFrames(self):
1656        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
1657        self.cli.send(self.cf1)
1658
1659        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
1660        self.cli.send(self.cf2)
1661
1662    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1663                         'socket.CAN_BCM required for this test.')
1664    def _testBCM(self):
1665        cf, addr = self.cli.recvfrom(self.bufsize)
1666        self.assertEqual(self.cf, cf)
1667        can_id, can_dlc, data = self.dissect_can_frame(cf)
1668        self.assertEqual(self.can_id, can_id)
1669        self.assertEqual(self.data, data)
1670
1671    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1672                         'socket.CAN_BCM required for this test.')
1673    def testBCM(self):
1674        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
1675        self.addCleanup(bcm.close)
1676        bcm.connect((self.interface,))
1677        self.can_id = 0x123
1678        self.data = bytes([0xc0, 0xff, 0xee])
1679        self.cf = self.build_can_frame(self.can_id, self.data)
1680        opcode = socket.CAN_BCM_TX_SEND
1681        flags = 0
1682        count = 0
1683        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
1684        bcm_can_id = 0x0222
1685        nframes = 1
1686        assert len(self.cf) == 16
1687        header = struct.pack(self.bcm_cmd_msg_fmt,
1688                    opcode,
1689                    flags,
1690                    count,
1691                    ival1_seconds,
1692                    ival1_usec,
1693                    ival2_seconds,
1694                    ival2_usec,
1695                    bcm_can_id,
1696                    nframes,
1697                    )
1698        header_plus_frame = header + self.cf
1699        bytes_sent = bcm.send(header_plus_frame)
1700        self.assertEqual(bytes_sent, len(header_plus_frame))
1701
1702
1703@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
1704class BasicRDSTest(unittest.TestCase):
1705
1706    def testCrucialConstants(self):
1707        socket.AF_RDS
1708        socket.PF_RDS
1709
1710    def testCreateSocket(self):
1711        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
1712            pass
1713
1714    def testSocketBufferSize(self):
1715        bufsize = 16384
1716        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
1717            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
1718            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
1719
1720
1721@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
1722@unittest.skipUnless(thread, 'Threading required for this test.')
1723class RDSTest(ThreadedRDSSocketTest):
1724
1725    def __init__(self, methodName='runTest'):
1726        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
1727
1728    def setUp(self):
1729        super().setUp()
1730        self.evt = threading.Event()
1731
1732    def testSendAndRecv(self):
1733        data, addr = self.serv.recvfrom(self.bufsize)
1734        self.assertEqual(self.data, data)
1735        self.assertEqual(self.cli_addr, addr)
1736
1737    def _testSendAndRecv(self):
1738        self.data = b'spam'
1739        self.cli.sendto(self.data, 0, (HOST, self.port))
1740
1741    def testPeek(self):
1742        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
1743        self.assertEqual(self.data, data)
1744        data, addr = self.serv.recvfrom(self.bufsize)
1745        self.assertEqual(self.data, data)
1746
1747    def _testPeek(self):
1748        self.data = b'spam'
1749        self.cli.sendto(self.data, 0, (HOST, self.port))
1750
1751    @requireAttrs(socket.socket, 'recvmsg')
1752    def testSendAndRecvMsg(self):
1753        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
1754        self.assertEqual(self.data, data)
1755
1756    @requireAttrs(socket.socket, 'sendmsg')
1757    def _testSendAndRecvMsg(self):
1758        self.data = b'hello ' * 10
1759        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
1760
1761    def testSendAndRecvMulti(self):
1762        data, addr = self.serv.recvfrom(self.bufsize)
1763        self.assertEqual(self.data1, data)
1764
1765        data, addr = self.serv.recvfrom(self.bufsize)
1766        self.assertEqual(self.data2, data)
1767
1768    def _testSendAndRecvMulti(self):
1769        self.data1 = b'bacon'
1770        self.cli.sendto(self.data1, 0, (HOST, self.port))
1771
1772        self.data2 = b'egg'
1773        self.cli.sendto(self.data2, 0, (HOST, self.port))
1774
1775    def testSelect(self):
1776        r, w, x = select.select([self.serv], [], [], 3.0)
1777        self.assertIn(self.serv, r)
1778        data, addr = self.serv.recvfrom(self.bufsize)
1779        self.assertEqual(self.data, data)
1780
1781    def _testSelect(self):
1782        self.data = b'select'
1783        self.cli.sendto(self.data, 0, (HOST, self.port))
1784
1785
1786@unittest.skipUnless(thread, 'Threading required for this test.')
1787class BasicTCPTest(SocketConnectedTest):
1788
1789    def __init__(self, methodName='runTest'):
1790        SocketConnectedTest.__init__(self, methodName=methodName)
1791
1792    def testRecv(self):
1793        # Testing large receive over TCP
1794        msg = self.cli_conn.recv(1024)
1795        self.assertEqual(msg, MSG)
1796
1797    def _testRecv(self):
1798        self.serv_conn.send(MSG)
1799
1800    def testOverFlowRecv(self):
1801        # Testing receive in chunks over TCP
1802        seg1 = self.cli_conn.recv(len(MSG) - 3)
1803        seg2 = self.cli_conn.recv(1024)
1804        msg = seg1 + seg2
1805        self.assertEqual(msg, MSG)
1806
1807    def _testOverFlowRecv(self):
1808        self.serv_conn.send(MSG)
1809
1810    def testRecvFrom(self):
1811        # Testing large recvfrom() over TCP
1812        msg, addr = self.cli_conn.recvfrom(1024)
1813        self.assertEqual(msg, MSG)
1814
1815    def _testRecvFrom(self):
1816        self.serv_conn.send(MSG)
1817
1818    def testOverFlowRecvFrom(self):
1819        # Testing recvfrom() in chunks over TCP
1820        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
1821        seg2, addr = self.cli_conn.recvfrom(1024)
1822        msg = seg1 + seg2
1823        self.assertEqual(msg, MSG)
1824
1825    def _testOverFlowRecvFrom(self):
1826        self.serv_conn.send(MSG)
1827
1828    def testSendAll(self):
1829        # Testing sendall() with a 2048 byte string over TCP
1830        msg = b''
1831        while 1:
1832            read = self.cli_conn.recv(1024)
1833            if not read:
1834                break
1835            msg += read
1836        self.assertEqual(msg, b'f' * 2048)
1837
1838    def _testSendAll(self):
1839        big_chunk = b'f' * 2048
1840        self.serv_conn.sendall(big_chunk)
1841
1842    def testFromFd(self):
1843        # Testing fromfd()
1844        fd = self.cli_conn.fileno()
1845        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
1846        self.addCleanup(sock.close)
1847        self.assertIsInstance(sock, socket.socket)
1848        msg = sock.recv(1024)
1849        self.assertEqual(msg, MSG)
1850
1851    def _testFromFd(self):
1852        self.serv_conn.send(MSG)
1853
1854    def testDup(self):
1855        # Testing dup()
1856        sock = self.cli_conn.dup()
1857        self.addCleanup(sock.close)
1858        msg = sock.recv(1024)
1859        self.assertEqual(msg, MSG)
1860
1861    def _testDup(self):
1862        self.serv_conn.send(MSG)
1863
1864    def testShutdown(self):
1865        # Testing shutdown()
1866        msg = self.cli_conn.recv(1024)
1867        self.assertEqual(msg, MSG)
1868        # wait for _testShutdown to finish: on OS X, when the server
1869        # closes the connection the client also becomes disconnected,
1870        # and the client's shutdown call will fail. (Issue #4397.)
1871        self.done.wait()
1872
1873    def _testShutdown(self):
1874        self.serv_conn.send(MSG)
1875        self.serv_conn.shutdown(2)
1876
1877    testShutdown_overflow = support.cpython_only(testShutdown)
1878
1879    @support.cpython_only
1880    def _testShutdown_overflow(self):
1881        import _testcapi
1882        self.serv_conn.send(MSG)
1883        # Issue 15989
1884        self.assertRaises(OverflowError, self.serv_conn.shutdown,
1885                          _testcapi.INT_MAX + 1)
1886        self.assertRaises(OverflowError, self.serv_conn.shutdown,
1887                          2 + (_testcapi.UINT_MAX + 1))
1888        self.serv_conn.shutdown(2)
1889
1890    def testDetach(self):
1891        # Testing detach()
1892        fileno = self.cli_conn.fileno()
1893        f = self.cli_conn.detach()
1894        self.assertEqual(f, fileno)
1895        # cli_conn cannot be used anymore...
1896        self.assertTrue(self.cli_conn._closed)
1897        self.assertRaises(OSError, self.cli_conn.recv, 1024)
1898        self.cli_conn.close()
1899        # ...but we can create another socket using the (still open)
1900        # file descriptor
1901        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
1902        self.addCleanup(sock.close)
1903        msg = sock.recv(1024)
1904        self.assertEqual(msg, MSG)
1905
1906    def _testDetach(self):
1907        self.serv_conn.send(MSG)
1908
1909@unittest.skipUnless(thread, 'Threading required for this test.')
1910class BasicUDPTest(ThreadedUDPSocketTest):
1911
1912    def __init__(self, methodName='runTest'):
1913        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
1914
1915    def testSendtoAndRecv(self):
1916        # Testing sendto() and Recv() over UDP
1917        msg = self.serv.recv(len(MSG))
1918        self.assertEqual(msg, MSG)
1919
1920    def _testSendtoAndRecv(self):
1921        self.cli.sendto(MSG, 0, (HOST, self.port))
1922
1923    def testRecvFrom(self):
1924        # Testing recvfrom() over UDP
1925        msg, addr = self.serv.recvfrom(len(MSG))
1926        self.assertEqual(msg, MSG)
1927
1928    def _testRecvFrom(self):
1929        self.cli.sendto(MSG, 0, (HOST, self.port))
1930
1931    def testRecvFromNegative(self):
1932        # Negative lengths passed to recvfrom should give ValueError.
1933        self.assertRaises(ValueError, self.serv.recvfrom, -1)
1934
1935    def _testRecvFromNegative(self):
1936        self.cli.sendto(MSG, 0, (HOST, self.port))
1937
1938# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
1939# same test code is used with different families and types of socket
1940# (e.g. stream, datagram), and tests using recvmsg() are repeated
1941# using recvmsg_into().
1942#
1943# The generic test classes such as SendmsgTests and
1944# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
1945# supplied with sockets cli_sock and serv_sock representing the
1946# client's and the server's end of the connection respectively, and
1947# attributes cli_addr and serv_addr holding their (numeric where
1948# appropriate) addresses.
1949#
1950# The final concrete test classes combine these with subclasses of
1951# SocketTestBase which set up client and server sockets of a specific
1952# type, and with subclasses of SendrecvmsgBase such as
1953# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
1954# sockets to cli_sock and serv_sock and override the methods and
1955# attributes of SendrecvmsgBase to fill in destination addresses if
1956# needed when sending, check for specific flags in msg_flags, etc.
1957#
1958# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
1959# recvmsg_into().
1960
1961# XXX: like the other datagram (UDP) tests in this module, the code
1962# here assumes that datagram delivery on the local machine will be
1963# reliable.
1964
1965class SendrecvmsgBase(ThreadSafeCleanupTestCase):
1966    # Base class for sendmsg()/recvmsg() tests.
1967
1968    # Time in seconds to wait before considering a test failed, or
1969    # None for no timeout.  Not all tests actually set a timeout.
1970    fail_timeout = 3.0
1971
1972    def setUp(self):
1973        self.misc_event = threading.Event()
1974        super().setUp()
1975
1976    def sendToServer(self, msg):
1977        # Send msg to the server.
1978        return self.cli_sock.send(msg)
1979
1980    # Tuple of alternative default arguments for sendmsg() when called
1981    # via sendmsgToServer() (e.g. to include a destination address).
1982    sendmsg_to_server_defaults = ()
1983
1984    def sendmsgToServer(self, *args):
1985        # Call sendmsg() on self.cli_sock with the given arguments,
1986        # filling in any arguments which are not supplied with the
1987        # corresponding items of self.sendmsg_to_server_defaults, if
1988        # any.
1989        return self.cli_sock.sendmsg(
1990            *(args + self.sendmsg_to_server_defaults[len(args):]))
1991
1992    def doRecvmsg(self, sock, bufsize, *args):
1993        # Call recvmsg() on sock with given arguments and return its
1994        # result.  Should be used for tests which can use either
1995        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
1996        # this method with one which emulates it using recvmsg_into(),
1997        # thus allowing the same test to be used for both methods.
1998        result = sock.recvmsg(bufsize, *args)
1999        self.registerRecvmsgResult(result)
2000        return result
2001
2002    def registerRecvmsgResult(self, result):
2003        # Called by doRecvmsg() with the return value of recvmsg() or
2004        # recvmsg_into().  Can be overridden to arrange cleanup based
2005        # on the returned ancillary data, for instance.
2006        pass
2007
2008    def checkRecvmsgAddress(self, addr1, addr2):
2009        # Called to compare the received address with the address of
2010        # the peer.
2011        self.assertEqual(addr1, addr2)
2012
2013    # Flags that are normally unset in msg_flags
2014    msg_flags_common_unset = 0
2015    for name in ("MSG_CTRUNC", "MSG_OOB"):
2016        msg_flags_common_unset |= getattr(socket, name, 0)
2017
2018    # Flags that are normally set
2019    msg_flags_common_set = 0
2020
2021    # Flags set when a complete record has been received (e.g. MSG_EOR
2022    # for SCTP)
2023    msg_flags_eor_indicator = 0
2024
2025    # Flags set when a complete record has not been received
2026    # (e.g. MSG_TRUNC for datagram sockets)
2027    msg_flags_non_eor_indicator = 0
2028
2029    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2030        # Method to check the value of msg_flags returned by recvmsg[_into]().
2031        #
2032        # Checks that all bits in msg_flags_common_set attribute are
2033        # set in "flags" and all bits in msg_flags_common_unset are
2034        # unset.
2035        #
2036        # The "eor" argument specifies whether the flags should
2037        # indicate that a full record (or datagram) has been received.
2038        # If "eor" is None, no checks are done; otherwise, checks
2039        # that:
2040        #
2041        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
2042        #    set and all bits in msg_flags_non_eor_indicator are unset
2043        #
2044        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
2045        #    are set and all bits in msg_flags_eor_indicator are unset
2046        #
2047        # If "checkset" and/or "checkunset" are supplied, they require
2048        # the given bits to be set or unset respectively, overriding
2049        # what the attributes require for those bits.
2050        #
2051        # If any bits are set in "ignore", they will not be checked,
2052        # regardless of the other inputs.
2053        #
2054        # Will raise Exception if the inputs require a bit to be both
2055        # set and unset, and it is not ignored.
2056
2057        defaultset = self.msg_flags_common_set
2058        defaultunset = self.msg_flags_common_unset
2059
2060        if eor:
2061            defaultset |= self.msg_flags_eor_indicator
2062            defaultunset |= self.msg_flags_non_eor_indicator
2063        elif eor is not None:
2064            defaultset |= self.msg_flags_non_eor_indicator
2065            defaultunset |= self.msg_flags_eor_indicator
2066
2067        # Function arguments override defaults
2068        defaultset &= ~checkunset
2069        defaultunset &= ~checkset
2070
2071        # Merge arguments with remaining defaults, and check for conflicts
2072        checkset |= defaultset
2073        checkunset |= defaultunset
2074        inboth = checkset & checkunset & ~ignore
2075        if inboth:
2076            raise Exception("contradictory set, unset requirements for flags "
2077                            "{0:#x}".format(inboth))
2078
2079        # Compare with given msg_flags value
2080        mask = (checkset | checkunset) & ~ignore
2081        self.assertEqual(flags & mask, checkset & mask)
2082
2083
2084class RecvmsgIntoMixin(SendrecvmsgBase):
2085    # Mixin to implement doRecvmsg() using recvmsg_into().
2086
2087    def doRecvmsg(self, sock, bufsize, *args):
2088        buf = bytearray(bufsize)
2089        result = sock.recvmsg_into([buf], *args)
2090        self.registerRecvmsgResult(result)
2091        self.assertGreaterEqual(result[0], 0)
2092        self.assertLessEqual(result[0], bufsize)
2093        return (bytes(buf[:result[0]]),) + result[1:]
2094
2095
2096class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2097    # Defines flags to be checked in msg_flags for datagram sockets.
2098
2099    @property
2100    def msg_flags_non_eor_indicator(self):
2101        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2102
2103
2104class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2105    # Defines flags to be checked in msg_flags for SCTP sockets.
2106
2107    @property
2108    def msg_flags_eor_indicator(self):
2109        return super().msg_flags_eor_indicator | socket.MSG_EOR
2110
2111
2112class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2113    # Base class for tests on connectionless-mode sockets.  Users must
2114    # supply sockets on attributes cli and serv to be mapped to
2115    # cli_sock and serv_sock respectively.
2116
2117    @property
2118    def serv_sock(self):
2119        return self.serv
2120
2121    @property
2122    def cli_sock(self):
2123        return self.cli
2124
2125    @property
2126    def sendmsg_to_server_defaults(self):
2127        return ([], [], 0, self.serv_addr)
2128
2129    def sendToServer(self, msg):
2130        return self.cli_sock.sendto(msg, self.serv_addr)
2131
2132
2133class SendrecvmsgConnectedBase(SendrecvmsgBase):
2134    # Base class for tests on connected sockets.  Users must supply
2135    # sockets on attributes serv_conn and cli_conn (representing the
2136    # connections *to* the server and the client), to be mapped to
2137    # cli_sock and serv_sock respectively.
2138
2139    @property
2140    def serv_sock(self):
2141        return self.cli_conn
2142
2143    @property
2144    def cli_sock(self):
2145        return self.serv_conn
2146
2147    def checkRecvmsgAddress(self, addr1, addr2):
2148        # Address is currently "unspecified" for a connected socket,
2149        # so we don't examine it
2150        pass
2151
2152
2153class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2154    # Base class to set a timeout on server's socket.
2155
2156    def setUp(self):
2157        super().setUp()
2158        self.serv_sock.settimeout(self.fail_timeout)
2159
2160
2161class SendmsgTests(SendrecvmsgServerTimeoutBase):
2162    # Tests for sendmsg() which can use any socket type and do not
2163    # involve recvmsg() or recvmsg_into().
2164
2165    def testSendmsg(self):
2166        # Send a simple message with sendmsg().
2167        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2168
2169    def _testSendmsg(self):
2170        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2171
2172    def testSendmsgDataGenerator(self):
2173        # Send from buffer obtained from a generator (not a sequence).
2174        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2175
2176    def _testSendmsgDataGenerator(self):
2177        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2178                         len(MSG))
2179
2180    def testSendmsgAncillaryGenerator(self):
2181        # Gather (empty) ancillary data from a generator.
2182        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2183
2184    def _testSendmsgAncillaryGenerator(self):
2185        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2186                         len(MSG))
2187
2188    def testSendmsgArray(self):
2189        # Send data from an array instead of the usual bytes object.
2190        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2191
2192    def _testSendmsgArray(self):
2193        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2194                         len(MSG))
2195
2196    def testSendmsgGather(self):
2197        # Send message data from more than one buffer (gather write).
2198        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2199
2200    def _testSendmsgGather(self):
2201        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2202
2203    def testSendmsgBadArgs(self):
2204        # Check that sendmsg() rejects invalid arguments.
2205        self.assertEqual(self.serv_sock.recv(1000), b"done")
2206
2207    def _testSendmsgBadArgs(self):
2208        self.assertRaises(TypeError, self.cli_sock.sendmsg)
2209        self.assertRaises(TypeError, self.sendmsgToServer,
2210                          b"not in an iterable")
2211        self.assertRaises(TypeError, self.sendmsgToServer,
2212                          object())
2213        self.assertRaises(TypeError, self.sendmsgToServer,
2214                          [object()])
2215        self.assertRaises(TypeError, self.sendmsgToServer,
2216                          [MSG, object()])
2217        self.assertRaises(TypeError, self.sendmsgToServer,
2218                          [MSG], object())
2219        self.assertRaises(TypeError, self.sendmsgToServer,
2220                          [MSG], [], object())
2221        self.assertRaises(TypeError, self.sendmsgToServer,
2222                          [MSG], [], 0, object())
2223        self.sendToServer(b"done")
2224
2225    def testSendmsgBadCmsg(self):
2226        # Check that invalid ancillary data items are rejected.
2227        self.assertEqual(self.serv_sock.recv(1000), b"done")
2228
2229    def _testSendmsgBadCmsg(self):
2230        self.assertRaises(TypeError, self.sendmsgToServer,
2231                          [MSG], [object()])
2232        self.assertRaises(TypeError, self.sendmsgToServer,
2233                          [MSG], [(object(), 0, b"data")])
2234        self.assertRaises(TypeError, self.sendmsgToServer,
2235                          [MSG], [(0, object(), b"data")])
2236        self.assertRaises(TypeError, self.sendmsgToServer,
2237                          [MSG], [(0, 0, object())])
2238        self.assertRaises(TypeError, self.sendmsgToServer,
2239                          [MSG], [(0, 0)])
2240        self.assertRaises(TypeError, self.sendmsgToServer,
2241                          [MSG], [(0, 0, b"data", 42)])
2242        self.sendToServer(b"done")
2243
2244    @requireAttrs(socket, "CMSG_SPACE")
2245    def testSendmsgBadMultiCmsg(self):
2246        # Check that invalid ancillary data items are rejected when
2247        # more than one item is present.
2248        self.assertEqual(self.serv_sock.recv(1000), b"done")
2249
2250    @testSendmsgBadMultiCmsg.client_skip
2251    def _testSendmsgBadMultiCmsg(self):
2252        self.assertRaises(TypeError, self.sendmsgToServer,
2253                          [MSG], [0, 0, b""])
2254        self.assertRaises(TypeError, self.sendmsgToServer,
2255                          [MSG], [(0, 0, b""), object()])
2256        self.sendToServer(b"done")
2257
2258    def testSendmsgExcessCmsgReject(self):
2259        # Check that sendmsg() rejects excess ancillary data items
2260        # when the number that can be sent is limited.
2261        self.assertEqual(self.serv_sock.recv(1000), b"done")
2262
2263    def _testSendmsgExcessCmsgReject(self):
2264        if not hasattr(socket, "CMSG_SPACE"):
2265            # Can only send one item
2266            with self.assertRaises(OSError) as cm:
2267                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
2268            self.assertIsNone(cm.exception.errno)
2269        self.sendToServer(b"done")
2270
2271    def testSendmsgAfterClose(self):
2272        # Check that sendmsg() fails on a closed socket.
2273        pass
2274
2275    def _testSendmsgAfterClose(self):
2276        self.cli_sock.close()
2277        self.assertRaises(OSError, self.sendmsgToServer, [MSG])
2278
2279
2280class SendmsgStreamTests(SendmsgTests):
2281    # Tests for sendmsg() which require a stream socket and do not
2282    # involve recvmsg() or recvmsg_into().
2283
2284    def testSendmsgExplicitNoneAddr(self):
2285        # Check that peer address can be specified as None.
2286        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2287
2288    def _testSendmsgExplicitNoneAddr(self):
2289        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
2290
2291    def testSendmsgTimeout(self):
2292        # Check that timeout works with sendmsg().
2293        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2294        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2295
2296    def _testSendmsgTimeout(self):
2297        try:
2298            self.cli_sock.settimeout(0.03)
2299            try:
2300                while True:
2301                    self.sendmsgToServer([b"a"*512])
2302            except socket.timeout:
2303                pass
2304            except OSError as exc:
2305                if exc.errno != errno.ENOMEM:
2306                    raise
2307                # bpo-33937 the test randomly fails on Travis CI with
2308                # "OSError: [Errno 12] Cannot allocate memory"
2309            else:
2310                self.fail("socket.timeout not raised")
2311        finally:
2312            self.misc_event.set()
2313
2314    # XXX: would be nice to have more tests for sendmsg flags argument.
2315
2316    # Linux supports MSG_DONTWAIT when sending, but in general, it
2317    # only works when receiving.  Could add other platforms if they
2318    # support it too.
2319    @skipWithClientIf(sys.platform not in {"linux"},
2320                      "MSG_DONTWAIT not known to work on this platform when "
2321                      "sending")
2322    def testSendmsgDontWait(self):
2323        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
2324        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2325        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2326
2327    @testSendmsgDontWait.client_skip
2328    def _testSendmsgDontWait(self):
2329        try:
2330            with self.assertRaises(OSError) as cm:
2331                while True:
2332                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
2333            # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI
2334            # with "OSError: [Errno 12] Cannot allocate memory"
2335            self.assertIn(cm.exception.errno,
2336                          (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM))
2337        finally:
2338            self.misc_event.set()
2339
2340
2341class SendmsgConnectionlessTests(SendmsgTests):
2342    # Tests for sendmsg() which require a connectionless-mode
2343    # (e.g. datagram) socket, and do not involve recvmsg() or
2344    # recvmsg_into().
2345
2346    def testSendmsgNoDestAddr(self):
2347        # Check that sendmsg() fails when no destination address is
2348        # given for unconnected socket.
2349        pass
2350
2351    def _testSendmsgNoDestAddr(self):
2352        self.assertRaises(OSError, self.cli_sock.sendmsg,
2353                          [MSG])
2354        self.assertRaises(OSError, self.cli_sock.sendmsg,
2355                          [MSG], [], 0, None)
2356
2357
2358class RecvmsgGenericTests(SendrecvmsgBase):
2359    # Tests for recvmsg() which can also be emulated using
2360    # recvmsg_into(), and can use any socket type.
2361
2362    def testRecvmsg(self):
2363        # Receive a simple message with recvmsg[_into]().
2364        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2365        self.assertEqual(msg, MSG)
2366        self.checkRecvmsgAddress(addr, self.cli_addr)
2367        self.assertEqual(ancdata, [])
2368        self.checkFlags(flags, eor=True)
2369
2370    def _testRecvmsg(self):
2371        self.sendToServer(MSG)
2372
2373    def testRecvmsgExplicitDefaults(self):
2374        # Test recvmsg[_into]() with default arguments provided explicitly.
2375        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2376                                                   len(MSG), 0, 0)
2377        self.assertEqual(msg, MSG)
2378        self.checkRecvmsgAddress(addr, self.cli_addr)
2379        self.assertEqual(ancdata, [])
2380        self.checkFlags(flags, eor=True)
2381
2382    def _testRecvmsgExplicitDefaults(self):
2383        self.sendToServer(MSG)
2384
2385    def testRecvmsgShorter(self):
2386        # Receive a message smaller than buffer.
2387        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2388                                                   len(MSG) + 42)
2389        self.assertEqual(msg, MSG)
2390        self.checkRecvmsgAddress(addr, self.cli_addr)
2391        self.assertEqual(ancdata, [])
2392        self.checkFlags(flags, eor=True)
2393
2394    def _testRecvmsgShorter(self):
2395        self.sendToServer(MSG)
2396
2397    # FreeBSD < 8 doesn't always set the MSG_TRUNC flag when a truncated
2398    # datagram is received (issue #13001).
2399    @support.requires_freebsd_version(8)
2400    def testRecvmsgTrunc(self):
2401        # Receive part of message, check for truncation indicators.
2402        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2403                                                   len(MSG) - 3)
2404        self.assertEqual(msg, MSG[:-3])
2405        self.checkRecvmsgAddress(addr, self.cli_addr)
2406        self.assertEqual(ancdata, [])
2407        self.checkFlags(flags, eor=False)
2408
2409    @support.requires_freebsd_version(8)
2410    def _testRecvmsgTrunc(self):
2411        self.sendToServer(MSG)
2412
2413    def testRecvmsgShortAncillaryBuf(self):
2414        # Test ancillary data buffer too small to hold any ancillary data.
2415        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2416                                                   len(MSG), 1)
2417        self.assertEqual(msg, MSG)
2418        self.checkRecvmsgAddress(addr, self.cli_addr)
2419        self.assertEqual(ancdata, [])
2420        self.checkFlags(flags, eor=True)
2421
2422    def _testRecvmsgShortAncillaryBuf(self):
2423        self.sendToServer(MSG)
2424
2425    def testRecvmsgLongAncillaryBuf(self):
2426        # Test large ancillary data buffer.
2427        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2428                                                   len(MSG), 10240)
2429        self.assertEqual(msg, MSG)
2430        self.checkRecvmsgAddress(addr, self.cli_addr)
2431        self.assertEqual(ancdata, [])
2432        self.checkFlags(flags, eor=True)
2433
2434    def _testRecvmsgLongAncillaryBuf(self):
2435        self.sendToServer(MSG)
2436
2437    def testRecvmsgAfterClose(self):
2438        # Check that recvmsg[_into]() fails on a closed socket.
2439        self.serv_sock.close()
2440        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
2441
2442    def _testRecvmsgAfterClose(self):
2443        pass
2444
2445    def testRecvmsgTimeout(self):
2446        # Check that timeout works.
2447        try:
2448            self.serv_sock.settimeout(0.03)
2449            self.assertRaises(socket.timeout,
2450                              self.doRecvmsg, self.serv_sock, len(MSG))
2451        finally:
2452            self.misc_event.set()
2453
2454    def _testRecvmsgTimeout(self):
2455        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2456
2457    @requireAttrs(socket, "MSG_PEEK")
2458    def testRecvmsgPeek(self):
2459        # Check that MSG_PEEK in flags enables examination of pending
2460        # data without consuming it.
2461
2462        # Receive part of data with MSG_PEEK.
2463        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2464                                                   len(MSG) - 3, 0,
2465                                                   socket.MSG_PEEK)
2466        self.assertEqual(msg, MSG[:-3])
2467        self.checkRecvmsgAddress(addr, self.cli_addr)
2468        self.assertEqual(ancdata, [])
2469        # Ignoring MSG_TRUNC here (so this test is the same for stream
2470        # and datagram sockets).  Some wording in POSIX seems to
2471        # suggest that it needn't be set when peeking, but that may
2472        # just be a slip.
2473        self.checkFlags(flags, eor=False,
2474                        ignore=getattr(socket, "MSG_TRUNC", 0))
2475
2476        # Receive all data with MSG_PEEK.
2477        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2478                                                   len(MSG), 0,
2479                                                   socket.MSG_PEEK)
2480        self.assertEqual(msg, MSG)
2481        self.checkRecvmsgAddress(addr, self.cli_addr)
2482        self.assertEqual(ancdata, [])
2483        self.checkFlags(flags, eor=True)
2484
2485        # Check that the same data can still be received normally.
2486        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2487        self.assertEqual(msg, MSG)
2488        self.checkRecvmsgAddress(addr, self.cli_addr)
2489        self.assertEqual(ancdata, [])
2490        self.checkFlags(flags, eor=True)
2491
2492    @testRecvmsgPeek.client_skip
2493    def _testRecvmsgPeek(self):
2494        self.sendToServer(MSG)
2495
2496    @requireAttrs(socket.socket, "sendmsg")
2497    def testRecvmsgFromSendmsg(self):
2498        # Test receiving with recvmsg[_into]() when message is sent
2499        # using sendmsg().
2500        self.serv_sock.settimeout(self.fail_timeout)
2501        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2502        self.assertEqual(msg, MSG)
2503        self.checkRecvmsgAddress(addr, self.cli_addr)
2504        self.assertEqual(ancdata, [])
2505        self.checkFlags(flags, eor=True)
2506
2507    @testRecvmsgFromSendmsg.client_skip
2508    def _testRecvmsgFromSendmsg(self):
2509        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2510
2511
2512class RecvmsgGenericStreamTests(RecvmsgGenericTests):
2513    # Tests which require a stream socket and can use either recvmsg()
2514    # or recvmsg_into().
2515
2516    def testRecvmsgEOF(self):
2517        # Receive end-of-stream indicator (b"", peer socket closed).
2518        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2519        self.assertEqual(msg, b"")
2520        self.checkRecvmsgAddress(addr, self.cli_addr)
2521        self.assertEqual(ancdata, [])
2522        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
2523
2524    def _testRecvmsgEOF(self):
2525        self.cli_sock.close()
2526
2527    def testRecvmsgOverflow(self):
2528        # Receive a message in more than one chunk.
2529        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2530                                                    len(MSG) - 3)
2531        self.checkRecvmsgAddress(addr, self.cli_addr)
2532        self.assertEqual(ancdata, [])
2533        self.checkFlags(flags, eor=False)
2534
2535        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2536        self.checkRecvmsgAddress(addr, self.cli_addr)
2537        self.assertEqual(ancdata, [])
2538        self.checkFlags(flags, eor=True)
2539
2540        msg = seg1 + seg2
2541        self.assertEqual(msg, MSG)
2542
2543    def _testRecvmsgOverflow(self):
2544        self.sendToServer(MSG)
2545
2546
2547class RecvmsgTests(RecvmsgGenericTests):
2548    # Tests for recvmsg() which can use any socket type.
2549
2550    def testRecvmsgBadArgs(self):
2551        # Check that recvmsg() rejects invalid arguments.
2552        self.assertRaises(TypeError, self.serv_sock.recvmsg)
2553        self.assertRaises(ValueError, self.serv_sock.recvmsg,
2554                          -1, 0, 0)
2555        self.assertRaises(ValueError, self.serv_sock.recvmsg,
2556                          len(MSG), -1, 0)
2557        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2558                          [bytearray(10)], 0, 0)
2559        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2560                          object(), 0, 0)
2561        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2562                          len(MSG), object(), 0)
2563        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2564                          len(MSG), 0, object())
2565
2566        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
2567        self.assertEqual(msg, MSG)
2568        self.checkRecvmsgAddress(addr, self.cli_addr)
2569        self.assertEqual(ancdata, [])
2570        self.checkFlags(flags, eor=True)
2571
2572    def _testRecvmsgBadArgs(self):
2573        self.sendToServer(MSG)
2574
2575
2576class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
2577    # Tests for recvmsg_into() which can use any socket type.
2578
2579    def testRecvmsgIntoBadArgs(self):
2580        # Check that recvmsg_into() rejects invalid arguments.
2581        buf = bytearray(len(MSG))
2582        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
2583        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2584                          len(MSG), 0, 0)
2585        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2586                          buf, 0, 0)
2587        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2588                          [object()], 0, 0)
2589        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2590                          [b"I'm not writable"], 0, 0)
2591        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2592                          [buf, object()], 0, 0)
2593        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
2594                          [buf], -1, 0)
2595        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2596                          [buf], object(), 0)
2597        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2598                          [buf], 0, object())
2599
2600        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
2601        self.assertEqual(nbytes, len(MSG))
2602        self.assertEqual(buf, bytearray(MSG))
2603        self.checkRecvmsgAddress(addr, self.cli_addr)
2604        self.assertEqual(ancdata, [])
2605        self.checkFlags(flags, eor=True)
2606
2607    def _testRecvmsgIntoBadArgs(self):
2608        self.sendToServer(MSG)
2609
2610    def testRecvmsgIntoGenerator(self):
2611        # Receive into buffer obtained from a generator (not a sequence).
2612        buf = bytearray(len(MSG))
2613        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
2614            (o for o in [buf]))
2615        self.assertEqual(nbytes, len(MSG))
2616        self.assertEqual(buf, bytearray(MSG))
2617        self.checkRecvmsgAddress(addr, self.cli_addr)
2618        self.assertEqual(ancdata, [])
2619        self.checkFlags(flags, eor=True)
2620
2621    def _testRecvmsgIntoGenerator(self):
2622        self.sendToServer(MSG)
2623
2624    def testRecvmsgIntoArray(self):
2625        # Receive into an array rather than the usual bytearray.
2626        buf = array.array("B", [0] * len(MSG))
2627        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
2628        self.assertEqual(nbytes, len(MSG))
2629        self.assertEqual(buf.tobytes(), MSG)
2630        self.checkRecvmsgAddress(addr, self.cli_addr)
2631        self.assertEqual(ancdata, [])
2632        self.checkFlags(flags, eor=True)
2633
2634    def _testRecvmsgIntoArray(self):
2635        self.sendToServer(MSG)
2636
2637    def testRecvmsgIntoScatter(self):
2638        # Receive into multiple buffers (scatter write).
2639        b1 = bytearray(b"----")
2640        b2 = bytearray(b"0123456789")
2641        b3 = bytearray(b"--------------")
2642        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
2643            [b1, memoryview(b2)[2:9], b3])
2644        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
2645        self.assertEqual(b1, bytearray(b"Mary"))
2646        self.assertEqual(b2, bytearray(b"01 had a 9"))
2647        self.assertEqual(b3, bytearray(b"little lamb---"))
2648        self.checkRecvmsgAddress(addr, self.cli_addr)
2649        self.assertEqual(ancdata, [])
2650        self.checkFlags(flags, eor=True)
2651
2652    def _testRecvmsgIntoScatter(self):
2653        self.sendToServer(b"Mary had a little lamb")
2654
2655
2656class CmsgMacroTests(unittest.TestCase):
2657    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
2658    # assumptions used by sendmsg() and recvmsg[_into](), which share
2659    # code with these functions.
2660
2661    # Match the definition in socketmodule.c
2662    try:
2663        import _testcapi
2664    except ImportError:
2665        socklen_t_limit = 0x7fffffff
2666    else:
2667        socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
2668
2669    @requireAttrs(socket, "CMSG_LEN")
2670    def testCMSG_LEN(self):
2671        # Test CMSG_LEN() with various valid and invalid values,
2672        # checking the assumptions used by recvmsg() and sendmsg().
2673        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
2674        values = list(range(257)) + list(range(toobig - 257, toobig))
2675
2676        # struct cmsghdr has at least three members, two of which are ints
2677        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
2678        for n in values:
2679            ret = socket.CMSG_LEN(n)
2680            # This is how recvmsg() calculates the data size
2681            self.assertEqual(ret - socket.CMSG_LEN(0), n)
2682            self.assertLessEqual(ret, self.socklen_t_limit)
2683
2684        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
2685        # sendmsg() shares code with these functions, and requires
2686        # that it reject values over the limit.
2687        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
2688        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
2689
2690    @requireAttrs(socket, "CMSG_SPACE")
2691    def testCMSG_SPACE(self):
2692        # Test CMSG_SPACE() with various valid and invalid values,
2693        # checking the assumptions used by sendmsg().
2694        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
2695        values = list(range(257)) + list(range(toobig - 257, toobig))
2696
2697        last = socket.CMSG_SPACE(0)
2698        # struct cmsghdr has at least three members, two of which are ints
2699        self.assertGreater(last, array.array("i").itemsize * 2)
2700        for n in values:
2701            ret = socket.CMSG_SPACE(n)
2702            self.assertGreaterEqual(ret, last)
2703            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
2704            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
2705            self.assertLessEqual(ret, self.socklen_t_limit)
2706            last = ret
2707
2708        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
2709        # sendmsg() shares code with these functions, and requires
2710        # that it reject values over the limit.
2711        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
2712        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
2713
2714
2715class SCMRightsTest(SendrecvmsgServerTimeoutBase):
2716    # Tests for file descriptor passing on Unix-domain sockets.
2717
2718    # Invalid file descriptor value that's unlikely to evaluate to a
2719    # real FD even if one of its bytes is replaced with a different
2720    # value (which shouldn't actually happen).
2721    badfd = -0x5555
2722
2723    def newFDs(self, n):
2724        # Return a list of n file descriptors for newly-created files
2725        # containing their list indices as ASCII numbers.
2726        fds = []
2727        for i in range(n):
2728            fd, path = tempfile.mkstemp()
2729            self.addCleanup(os.unlink, path)
2730            self.addCleanup(os.close, fd)
2731            os.write(fd, str(i).encode())
2732            fds.append(fd)
2733        return fds
2734
2735    def checkFDs(self, fds):
2736        # Check that the file descriptors in the given list contain
2737        # their correct list indices as ASCII numbers.
2738        for n, fd in enumerate(fds):
2739            os.lseek(fd, 0, os.SEEK_SET)
2740            self.assertEqual(os.read(fd, 1024), str(n).encode())
2741
2742    def registerRecvmsgResult(self, result):
2743        self.addCleanup(self.closeRecvmsgFDs, result)
2744
2745    def closeRecvmsgFDs(self, recvmsg_result):
2746        # Close all file descriptors specified in the ancillary data
2747        # of the given return value from recvmsg() or recvmsg_into().
2748        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
2749            if (cmsg_level == socket.SOL_SOCKET and
2750                    cmsg_type == socket.SCM_RIGHTS):
2751                fds = array.array("i")
2752                fds.frombytes(cmsg_data[:
2753                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
2754                for fd in fds:
2755                    os.close(fd)
2756
2757    def createAndSendFDs(self, n):
2758        # Send n new file descriptors created by newFDs() to the
2759        # server, with the constant MSG as the non-ancillary data.
2760        self.assertEqual(
2761            self.sendmsgToServer([MSG],
2762                                 [(socket.SOL_SOCKET,
2763                                   socket.SCM_RIGHTS,
2764                                   array.array("i", self.newFDs(n)))]),
2765            len(MSG))
2766
2767    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
2768        # Check that constant MSG was received with numfds file
2769        # descriptors in a maximum of maxcmsgs control messages (which
2770        # must contain only complete integers).  By default, check
2771        # that MSG_CTRUNC is unset, but ignore any flags in
2772        # ignoreflags.
2773        msg, ancdata, flags, addr = result
2774        self.assertEqual(msg, MSG)
2775        self.checkRecvmsgAddress(addr, self.cli_addr)
2776        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
2777                        ignore=ignoreflags)
2778
2779        self.assertIsInstance(ancdata, list)
2780        self.assertLessEqual(len(ancdata), maxcmsgs)
2781        fds = array.array("i")
2782        for item in ancdata:
2783            self.assertIsInstance(item, tuple)
2784            cmsg_level, cmsg_type, cmsg_data = item
2785            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2786            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2787            self.assertIsInstance(cmsg_data, bytes)
2788            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
2789            fds.frombytes(cmsg_data)
2790
2791        self.assertEqual(len(fds), numfds)
2792        self.checkFDs(fds)
2793
2794    def testFDPassSimple(self):
2795        # Pass a single FD (array read from bytes object).
2796        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
2797                                               len(MSG), 10240))
2798
2799    def _testFDPassSimple(self):
2800        self.assertEqual(
2801            self.sendmsgToServer(
2802                [MSG],
2803                [(socket.SOL_SOCKET,
2804                  socket.SCM_RIGHTS,
2805                  array.array("i", self.newFDs(1)).tobytes())]),
2806            len(MSG))
2807
2808    def testMultipleFDPass(self):
2809        # Pass multiple FDs in a single array.
2810        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
2811                                               len(MSG), 10240))
2812
2813    def _testMultipleFDPass(self):
2814        self.createAndSendFDs(4)
2815
2816    @requireAttrs(socket, "CMSG_SPACE")
2817    def testFDPassCMSG_SPACE(self):
2818        # Test using CMSG_SPACE() to calculate ancillary buffer size.
2819        self.checkRecvmsgFDs(
2820            4, self.doRecvmsg(self.serv_sock, len(MSG),
2821                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
2822
2823    @testFDPassCMSG_SPACE.client_skip
2824    def _testFDPassCMSG_SPACE(self):
2825        self.createAndSendFDs(4)
2826
2827    def testFDPassCMSG_LEN(self):
2828        # Test using CMSG_LEN() to calculate ancillary buffer size.
2829        self.checkRecvmsgFDs(1,
2830                             self.doRecvmsg(self.serv_sock, len(MSG),
2831                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
2832                             # RFC 3542 says implementations may set
2833                             # MSG_CTRUNC if there isn't enough space
2834                             # for trailing padding.
2835                             ignoreflags=socket.MSG_CTRUNC)
2836
2837    def _testFDPassCMSG_LEN(self):
2838        self.createAndSendFDs(1)
2839
2840    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2841    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2842    @requireAttrs(socket, "CMSG_SPACE")
2843    def testFDPassSeparate(self):
2844        # Pass two FDs in two separate arrays.  Arrays may be combined
2845        # into a single control message by the OS.
2846        self.checkRecvmsgFDs(2,
2847                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
2848                             maxcmsgs=2)
2849
2850    @testFDPassSeparate.client_skip
2851    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2852    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2853    def _testFDPassSeparate(self):
2854        fd0, fd1 = self.newFDs(2)
2855        self.assertEqual(
2856            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
2857                                          socket.SCM_RIGHTS,
2858                                          array.array("i", [fd0])),
2859                                         (socket.SOL_SOCKET,
2860                                          socket.SCM_RIGHTS,
2861                                          array.array("i", [fd1]))]),
2862            len(MSG))
2863
2864    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2865    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2866    @requireAttrs(socket, "CMSG_SPACE")
2867    def testFDPassSeparateMinSpace(self):
2868        # Pass two FDs in two separate arrays, receiving them into the
2869        # minimum space for two arrays.
2870        num_fds = 2
2871        self.checkRecvmsgFDs(num_fds,
2872                             self.doRecvmsg(self.serv_sock, len(MSG),
2873                                            socket.CMSG_SPACE(SIZEOF_INT) +
2874                                            socket.CMSG_LEN(SIZEOF_INT * num_fds)),
2875                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
2876
2877    @testFDPassSeparateMinSpace.client_skip
2878    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2879    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2880    def _testFDPassSeparateMinSpace(self):
2881        fd0, fd1 = self.newFDs(2)
2882        self.assertEqual(
2883            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
2884                                          socket.SCM_RIGHTS,
2885                                          array.array("i", [fd0])),
2886                                         (socket.SOL_SOCKET,
2887                                          socket.SCM_RIGHTS,
2888                                          array.array("i", [fd1]))]),
2889            len(MSG))
2890
2891    def sendAncillaryIfPossible(self, msg, ancdata):
2892        # Try to send msg and ancdata to server, but if the system
2893        # call fails, just send msg with no ancillary data.
2894        try:
2895            nbytes = self.sendmsgToServer([msg], ancdata)
2896        except OSError as e:
2897            # Check that it was the system call that failed
2898            self.assertIsInstance(e.errno, int)
2899            nbytes = self.sendmsgToServer([msg])
2900        self.assertEqual(nbytes, len(msg))
2901
2902    @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
2903    def testFDPassEmpty(self):
2904        # Try to pass an empty FD array.  Can receive either no array
2905        # or an empty array.
2906        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
2907                                               len(MSG), 10240),
2908                             ignoreflags=socket.MSG_CTRUNC)
2909
2910    def _testFDPassEmpty(self):
2911        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
2912                                            socket.SCM_RIGHTS,
2913                                            b"")])
2914
2915    def testFDPassPartialInt(self):
2916        # Try to pass a truncated FD array.
2917        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2918                                                   len(MSG), 10240)
2919        self.assertEqual(msg, MSG)
2920        self.checkRecvmsgAddress(addr, self.cli_addr)
2921        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
2922        self.assertLessEqual(len(ancdata), 1)
2923        for cmsg_level, cmsg_type, cmsg_data in ancdata:
2924            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2925            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2926            self.assertLess(len(cmsg_data), SIZEOF_INT)
2927
2928    def _testFDPassPartialInt(self):
2929        self.sendAncillaryIfPossible(
2930            MSG,
2931            [(socket.SOL_SOCKET,
2932              socket.SCM_RIGHTS,
2933              array.array("i", [self.badfd]).tobytes()[:-1])])
2934
2935    @requireAttrs(socket, "CMSG_SPACE")
2936    def testFDPassPartialIntInMiddle(self):
2937        # Try to pass two FD arrays, the first of which is truncated.
2938        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2939                                                   len(MSG), 10240)
2940        self.assertEqual(msg, MSG)
2941        self.checkRecvmsgAddress(addr, self.cli_addr)
2942        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
2943        self.assertLessEqual(len(ancdata), 2)
2944        fds = array.array("i")
2945        # Arrays may have been combined in a single control message
2946        for cmsg_level, cmsg_type, cmsg_data in ancdata:
2947            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2948            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2949            fds.frombytes(cmsg_data[:
2950                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
2951        self.assertLessEqual(len(fds), 2)
2952        self.checkFDs(fds)
2953
2954    @testFDPassPartialIntInMiddle.client_skip
2955    def _testFDPassPartialIntInMiddle(self):
2956        fd0, fd1 = self.newFDs(2)
2957        self.sendAncillaryIfPossible(
2958            MSG,
2959            [(socket.SOL_SOCKET,
2960              socket.SCM_RIGHTS,
2961              array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
2962             (socket.SOL_SOCKET,
2963              socket.SCM_RIGHTS,
2964              array.array("i", [fd1]))])
2965
2966    def checkTruncatedHeader(self, result, ignoreflags=0):
2967        # Check that no ancillary data items are returned when data is
2968        # truncated inside the cmsghdr structure.
2969        msg, ancdata, flags, addr = result
2970        self.assertEqual(msg, MSG)
2971        self.checkRecvmsgAddress(addr, self.cli_addr)
2972        self.assertEqual(ancdata, [])
2973        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
2974                        ignore=ignoreflags)
2975
2976    def testCmsgTruncNoBufSize(self):
2977        # Check that no ancillary data is received when no buffer size
2978        # is specified.
2979        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
2980                                  # BSD seems to set MSG_CTRUNC only
2981                                  # if an item has been partially
2982                                  # received.
2983                                  ignoreflags=socket.MSG_CTRUNC)
2984
2985    def _testCmsgTruncNoBufSize(self):
2986        self.createAndSendFDs(1)
2987
2988    def testCmsgTrunc0(self):
2989        # Check that no ancillary data is received when buffer size is 0.
2990        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
2991                                  ignoreflags=socket.MSG_CTRUNC)
2992
2993    def _testCmsgTrunc0(self):
2994        self.createAndSendFDs(1)
2995
2996    # Check that no ancillary data is returned for various non-zero
2997    # (but still too small) buffer sizes.
2998
2999    def testCmsgTrunc1(self):
3000        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
3001
3002    def _testCmsgTrunc1(self):
3003        self.createAndSendFDs(1)
3004
3005    def testCmsgTrunc2Int(self):
3006        # The cmsghdr structure has at least three members, two of
3007        # which are ints, so we still shouldn't see any ancillary
3008        # data.
3009        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3010                                                 SIZEOF_INT * 2))
3011
3012    def _testCmsgTrunc2Int(self):
3013        self.createAndSendFDs(1)
3014
3015    def testCmsgTruncLen0Minus1(self):
3016        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3017                                                 socket.CMSG_LEN(0) - 1))
3018
3019    def _testCmsgTruncLen0Minus1(self):
3020        self.createAndSendFDs(1)
3021
3022    # The following tests try to truncate the control message in the
3023    # middle of the FD array.
3024
3025    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3026        # Check that file descriptor data is truncated to between
3027        # mindata and maxdata bytes when received with buffer size
3028        # ancbuf, and that any complete file descriptor numbers are
3029        # valid.
3030        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3031                                                   len(MSG), ancbuf)
3032        self.assertEqual(msg, MSG)
3033        self.checkRecvmsgAddress(addr, self.cli_addr)
3034        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3035
3036        if mindata == 0 and ancdata == []:
3037            return
3038        self.assertEqual(len(ancdata), 1)
3039        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3040        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3041        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3042        self.assertGreaterEqual(len(cmsg_data), mindata)
3043        self.assertLessEqual(len(cmsg_data), maxdata)
3044        fds = array.array("i")
3045        fds.frombytes(cmsg_data[:
3046                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3047        self.checkFDs(fds)
3048
3049    def testCmsgTruncLen0(self):
3050        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3051
3052    def _testCmsgTruncLen0(self):
3053        self.createAndSendFDs(1)
3054
3055    def testCmsgTruncLen0Plus1(self):
3056        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3057
3058    def _testCmsgTruncLen0Plus1(self):
3059        self.createAndSendFDs(2)
3060
3061    def testCmsgTruncLen1(self):
3062        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3063                                 maxdata=SIZEOF_INT)
3064
3065    def _testCmsgTruncLen1(self):
3066        self.createAndSendFDs(2)
3067
3068    def testCmsgTruncLen2Minus1(self):
3069        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3070                                 maxdata=(2 * SIZEOF_INT) - 1)
3071
3072    def _testCmsgTruncLen2Minus1(self):
3073        self.createAndSendFDs(2)
3074
3075
3076class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3077    # Test sendmsg() and recvmsg[_into]() using the ancillary data
3078    # features of the RFC 3542 Advanced Sockets API for IPv6.
3079    # Currently we can only handle certain data items (e.g. traffic
3080    # class, hop limit, MTU discovery and fragmentation settings)
3081    # without resorting to unportable means such as the struct module,
3082    # but the tests here are aimed at testing the ancillary data
3083    # handling in sendmsg() and recvmsg() rather than the IPv6 API
3084    # itself.
3085
3086    # Test value to use when setting hop limit of packet
3087    hop_limit = 2
3088
3089    # Test value to use when setting traffic class of packet.
3090    # -1 means "use kernel default".
3091    traffic_class = -1
3092
3093    def ancillaryMapping(self, ancdata):
3094        # Given ancillary data list ancdata, return a mapping from
3095        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3096        # Check that no (level, type) pair appears more than once.
3097        d = {}
3098        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3099            self.assertNotIn((cmsg_level, cmsg_type), d)
3100            d[(cmsg_level, cmsg_type)] = cmsg_data
3101        return d
3102
3103    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3104        # Receive hop limit into ancbufsize bytes of ancillary data
3105        # space.  Check that data is MSG, ancillary data is not
3106        # truncated (but ignore any flags in ignoreflags), and hop
3107        # limit is between 0 and maxhop inclusive.
3108        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3109                                  socket.IPV6_RECVHOPLIMIT, 1)
3110        self.misc_event.set()
3111        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3112                                                   len(MSG), ancbufsize)
3113
3114        self.assertEqual(msg, MSG)
3115        self.checkRecvmsgAddress(addr, self.cli_addr)
3116        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3117                        ignore=ignoreflags)
3118
3119        self.assertEqual(len(ancdata), 1)
3120        self.assertIsInstance(ancdata[0], tuple)
3121        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3122        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3123        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3124        self.assertIsInstance(cmsg_data, bytes)
3125        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3126        a = array.array("i")
3127        a.frombytes(cmsg_data)
3128        self.assertGreaterEqual(a[0], 0)
3129        self.assertLessEqual(a[0], maxhop)
3130
3131    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3132    def testRecvHopLimit(self):
3133        # Test receiving the packet hop limit as ancillary data.
3134        self.checkHopLimit(ancbufsize=10240)
3135
3136    @testRecvHopLimit.client_skip
3137    def _testRecvHopLimit(self):
3138        # Need to wait until server has asked to receive ancillary
3139        # data, as implementations are not required to buffer it
3140        # otherwise.
3141        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3142        self.sendToServer(MSG)
3143
3144    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3145    def testRecvHopLimitCMSG_SPACE(self):
3146        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3147        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3148
3149    @testRecvHopLimitCMSG_SPACE.client_skip
3150    def _testRecvHopLimitCMSG_SPACE(self):
3151        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3152        self.sendToServer(MSG)
3153
3154    # Could test receiving into buffer sized using CMSG_LEN, but RFC
3155    # 3542 says portable applications must provide space for trailing
3156    # padding.  Implementations may set MSG_CTRUNC if there isn't
3157    # enough space for the padding.
3158
3159    @requireAttrs(socket.socket, "sendmsg")
3160    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3161    def testSetHopLimit(self):
3162        # Test setting hop limit on outgoing packet and receiving it
3163        # at the other end.
3164        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3165
3166    @testSetHopLimit.client_skip
3167    def _testSetHopLimit(self):
3168        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3169        self.assertEqual(
3170            self.sendmsgToServer([MSG],
3171                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3172                                   array.array("i", [self.hop_limit]))]),
3173            len(MSG))
3174
3175    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3176                                     ignoreflags=0):
3177        # Receive traffic class and hop limit into ancbufsize bytes of
3178        # ancillary data space.  Check that data is MSG, ancillary
3179        # data is not truncated (but ignore any flags in ignoreflags),
3180        # and traffic class and hop limit are in range (hop limit no
3181        # more than maxhop).
3182        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3183                                  socket.IPV6_RECVHOPLIMIT, 1)
3184        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3185                                  socket.IPV6_RECVTCLASS, 1)
3186        self.misc_event.set()
3187        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3188                                                   len(MSG), ancbufsize)
3189
3190        self.assertEqual(msg, MSG)
3191        self.checkRecvmsgAddress(addr, self.cli_addr)
3192        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3193                        ignore=ignoreflags)
3194        self.assertEqual(len(ancdata), 2)
3195        ancmap = self.ancillaryMapping(ancdata)
3196
3197        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3198        self.assertEqual(len(tcdata), SIZEOF_INT)
3199        a = array.array("i")
3200        a.frombytes(tcdata)
3201        self.assertGreaterEqual(a[0], 0)
3202        self.assertLessEqual(a[0], 255)
3203
3204        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3205        self.assertEqual(len(hldata), SIZEOF_INT)
3206        a = array.array("i")
3207        a.frombytes(hldata)
3208        self.assertGreaterEqual(a[0], 0)
3209        self.assertLessEqual(a[0], maxhop)
3210
3211    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3212                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3213    def testRecvTrafficClassAndHopLimit(self):
3214        # Test receiving traffic class and hop limit as ancillary data.
3215        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3216
3217    @testRecvTrafficClassAndHopLimit.client_skip
3218    def _testRecvTrafficClassAndHopLimit(self):
3219        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3220        self.sendToServer(MSG)
3221
3222    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3223                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3224    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3225        # Test receiving traffic class and hop limit, using
3226        # CMSG_SPACE() to calculate buffer size.
3227        self.checkTrafficClassAndHopLimit(
3228            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3229
3230    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3231    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3232        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3233        self.sendToServer(MSG)
3234
3235    @requireAttrs(socket.socket, "sendmsg")
3236    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3237                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3238    def testSetTrafficClassAndHopLimit(self):
3239        # Test setting traffic class and hop limit on outgoing packet,
3240        # and receiving them at the other end.
3241        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3242                                          maxhop=self.hop_limit)
3243
3244    @testSetTrafficClassAndHopLimit.client_skip
3245    def _testSetTrafficClassAndHopLimit(self):
3246        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3247        self.assertEqual(
3248            self.sendmsgToServer([MSG],
3249                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3250                                   array.array("i", [self.traffic_class])),
3251                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3252                                   array.array("i", [self.hop_limit]))]),
3253            len(MSG))
3254
3255    @requireAttrs(socket.socket, "sendmsg")
3256    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3257                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3258    def testOddCmsgSize(self):
3259        # Try to send ancillary data with first item one byte too
3260        # long.  Fall back to sending with correct size if this fails,
3261        # and check that second item was handled correctly.
3262        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3263                                          maxhop=self.hop_limit)
3264
3265    @testOddCmsgSize.client_skip
3266    def _testOddCmsgSize(self):
3267        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3268        try:
3269            nbytes = self.sendmsgToServer(
3270                [MSG],
3271                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3272                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
3273                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3274                  array.array("i", [self.hop_limit]))])
3275        except OSError as e:
3276            self.assertIsInstance(e.errno, int)
3277            nbytes = self.sendmsgToServer(
3278                [MSG],
3279                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3280                  array.array("i", [self.traffic_class])),
3281                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3282                  array.array("i", [self.hop_limit]))])
3283            self.assertEqual(nbytes, len(MSG))
3284
3285    # Tests for proper handling of truncated ancillary data
3286
3287    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
3288        # Receive hop limit into ancbufsize bytes of ancillary data
3289        # space, which should be too small to contain the ancillary
3290        # data header (if ancbufsize is None, pass no second argument
3291        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
3292        # (unless included in ignoreflags), and no ancillary data is
3293        # returned.
3294        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3295                                  socket.IPV6_RECVHOPLIMIT, 1)
3296        self.misc_event.set()
3297        args = () if ancbufsize is None else (ancbufsize,)
3298        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3299                                                   len(MSG), *args)
3300
3301        self.assertEqual(msg, MSG)
3302        self.checkRecvmsgAddress(addr, self.cli_addr)
3303        self.assertEqual(ancdata, [])
3304        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3305                        ignore=ignoreflags)
3306
3307    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3308    def testCmsgTruncNoBufSize(self):
3309        # Check that no ancillary data is received when no ancillary
3310        # buffer size is provided.
3311        self.checkHopLimitTruncatedHeader(ancbufsize=None,
3312                                          # BSD seems to set
3313                                          # MSG_CTRUNC only if an item
3314                                          # has been partially
3315                                          # received.
3316                                          ignoreflags=socket.MSG_CTRUNC)
3317
3318    @testCmsgTruncNoBufSize.client_skip
3319    def _testCmsgTruncNoBufSize(self):
3320        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3321        self.sendToServer(MSG)
3322
3323    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3324    def testSingleCmsgTrunc0(self):
3325        # Check that no ancillary data is received when ancillary
3326        # buffer size is zero.
3327        self.checkHopLimitTruncatedHeader(ancbufsize=0,
3328                                          ignoreflags=socket.MSG_CTRUNC)
3329
3330    @testSingleCmsgTrunc0.client_skip
3331    def _testSingleCmsgTrunc0(self):
3332        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3333        self.sendToServer(MSG)
3334
3335    # Check that no ancillary data is returned for various non-zero
3336    # (but still too small) buffer sizes.
3337
3338    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3339    def testSingleCmsgTrunc1(self):
3340        self.checkHopLimitTruncatedHeader(ancbufsize=1)
3341
3342    @testSingleCmsgTrunc1.client_skip
3343    def _testSingleCmsgTrunc1(self):
3344        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3345        self.sendToServer(MSG)
3346
3347    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3348    def testSingleCmsgTrunc2Int(self):
3349        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
3350
3351    @testSingleCmsgTrunc2Int.client_skip
3352    def _testSingleCmsgTrunc2Int(self):
3353        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3354        self.sendToServer(MSG)
3355
3356    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3357    def testSingleCmsgTruncLen0Minus1(self):
3358        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
3359
3360    @testSingleCmsgTruncLen0Minus1.client_skip
3361    def _testSingleCmsgTruncLen0Minus1(self):
3362        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3363        self.sendToServer(MSG)
3364
3365    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3366    def testSingleCmsgTruncInData(self):
3367        # Test truncation of a control message inside its associated
3368        # data.  The message may be returned with its data truncated,
3369        # or not returned at all.
3370        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3371                                  socket.IPV6_RECVHOPLIMIT, 1)
3372        self.misc_event.set()
3373        msg, ancdata, flags, addr = self.doRecvmsg(
3374            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
3375
3376        self.assertEqual(msg, MSG)
3377        self.checkRecvmsgAddress(addr, self.cli_addr)
3378        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3379
3380        self.assertLessEqual(len(ancdata), 1)
3381        if ancdata:
3382            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3383            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3384            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3385            self.assertLess(len(cmsg_data), SIZEOF_INT)
3386
3387    @testSingleCmsgTruncInData.client_skip
3388    def _testSingleCmsgTruncInData(self):
3389        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3390        self.sendToServer(MSG)
3391
3392    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
3393        # Receive traffic class and hop limit into ancbufsize bytes of
3394        # ancillary data space, which should be large enough to
3395        # contain the first item, but too small to contain the header
3396        # of the second.  Check that data is MSG, MSG_CTRUNC is set
3397        # (unless included in ignoreflags), and only one ancillary
3398        # data item is returned.
3399        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3400                                  socket.IPV6_RECVHOPLIMIT, 1)
3401        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3402                                  socket.IPV6_RECVTCLASS, 1)
3403        self.misc_event.set()
3404        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3405                                                   len(MSG), ancbufsize)
3406
3407        self.assertEqual(msg, MSG)
3408        self.checkRecvmsgAddress(addr, self.cli_addr)
3409        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3410                        ignore=ignoreflags)
3411
3412        self.assertEqual(len(ancdata), 1)
3413        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3414        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3415        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
3416        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3417        a = array.array("i")
3418        a.frombytes(cmsg_data)
3419        self.assertGreaterEqual(a[0], 0)
3420        self.assertLessEqual(a[0], 255)
3421
3422    # Try the above test with various buffer sizes.
3423
3424    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3425                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3426    def testSecondCmsgTrunc0(self):
3427        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
3428                                        ignoreflags=socket.MSG_CTRUNC)
3429
3430    @testSecondCmsgTrunc0.client_skip
3431    def _testSecondCmsgTrunc0(self):
3432        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3433        self.sendToServer(MSG)
3434
3435    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3436                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3437    def testSecondCmsgTrunc1(self):
3438        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
3439
3440    @testSecondCmsgTrunc1.client_skip
3441    def _testSecondCmsgTrunc1(self):
3442        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3443        self.sendToServer(MSG)
3444
3445    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3446                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3447    def testSecondCmsgTrunc2Int(self):
3448        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3449                                        2 * SIZEOF_INT)
3450
3451    @testSecondCmsgTrunc2Int.client_skip
3452    def _testSecondCmsgTrunc2Int(self):
3453        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3454        self.sendToServer(MSG)
3455
3456    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3457                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3458    def testSecondCmsgTruncLen0Minus1(self):
3459        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3460                                        socket.CMSG_LEN(0) - 1)
3461
3462    @testSecondCmsgTruncLen0Minus1.client_skip
3463    def _testSecondCmsgTruncLen0Minus1(self):
3464        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3465        self.sendToServer(MSG)
3466
3467    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3468                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3469    def testSecomdCmsgTruncInData(self):
3470        # Test truncation of the second of two control messages inside
3471        # its associated data.
3472        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3473                                  socket.IPV6_RECVHOPLIMIT, 1)
3474        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3475                                  socket.IPV6_RECVTCLASS, 1)
3476        self.misc_event.set()
3477        msg, ancdata, flags, addr = self.doRecvmsg(
3478            self.serv_sock, len(MSG),
3479            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
3480
3481        self.assertEqual(msg, MSG)
3482        self.checkRecvmsgAddress(addr, self.cli_addr)
3483        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3484
3485        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
3486
3487        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3488        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3489        cmsg_types.remove(cmsg_type)
3490        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3491        a = array.array("i")
3492        a.frombytes(cmsg_data)
3493        self.assertGreaterEqual(a[0], 0)
3494        self.assertLessEqual(a[0], 255)
3495
3496        if ancdata:
3497            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3498            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3499            cmsg_types.remove(cmsg_type)
3500            self.assertLess(len(cmsg_data), SIZEOF_INT)
3501
3502        self.assertEqual(ancdata, [])
3503
3504    @testSecomdCmsgTruncInData.client_skip
3505    def _testSecomdCmsgTruncInData(self):
3506        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3507        self.sendToServer(MSG)
3508
3509
3510# Derive concrete test classes for different socket types.
3511
3512class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
3513                             SendrecvmsgConnectionlessBase,
3514                             ThreadedSocketTestMixin, UDPTestBase):
3515    pass
3516
3517@requireAttrs(socket.socket, "sendmsg")
3518@unittest.skipUnless(thread, 'Threading required for this test.')
3519class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
3520    pass
3521
3522@requireAttrs(socket.socket, "recvmsg")
3523@unittest.skipUnless(thread, 'Threading required for this test.')
3524class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
3525    pass
3526
3527@requireAttrs(socket.socket, "recvmsg_into")
3528@unittest.skipUnless(thread, 'Threading required for this test.')
3529class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
3530    pass
3531
3532
3533class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
3534                              SendrecvmsgConnectionlessBase,
3535                              ThreadedSocketTestMixin, UDP6TestBase):
3536
3537    def checkRecvmsgAddress(self, addr1, addr2):
3538        # Called to compare the received address with the address of
3539        # the peer, ignoring scope ID
3540        self.assertEqual(addr1[:-1], addr2[:-1])
3541
3542@requireAttrs(socket.socket, "sendmsg")
3543@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3544@requireSocket("AF_INET6", "SOCK_DGRAM")
3545@unittest.skipUnless(thread, 'Threading required for this test.')
3546class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
3547    pass
3548
3549@requireAttrs(socket.socket, "recvmsg")
3550@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3551@requireSocket("AF_INET6", "SOCK_DGRAM")
3552@unittest.skipUnless(thread, 'Threading required for this test.')
3553class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
3554    pass
3555
3556@requireAttrs(socket.socket, "recvmsg_into")
3557@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3558@requireSocket("AF_INET6", "SOCK_DGRAM")
3559@unittest.skipUnless(thread, 'Threading required for this test.')
3560class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
3561    pass
3562
3563@requireAttrs(socket.socket, "recvmsg")
3564@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3565@requireAttrs(socket, "IPPROTO_IPV6")
3566@requireSocket("AF_INET6", "SOCK_DGRAM")
3567@unittest.skipUnless(thread, 'Threading required for this test.')
3568class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
3569                                      SendrecvmsgUDP6TestBase):
3570    pass
3571
3572@requireAttrs(socket.socket, "recvmsg_into")
3573@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3574@requireAttrs(socket, "IPPROTO_IPV6")
3575@requireSocket("AF_INET6", "SOCK_DGRAM")
3576@unittest.skipUnless(thread, 'Threading required for this test.')
3577class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
3578                                          RFC3542AncillaryTest,
3579                                          SendrecvmsgUDP6TestBase):
3580    pass
3581
3582
3583class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
3584                             ConnectedStreamTestMixin, TCPTestBase):
3585    pass
3586
3587@requireAttrs(socket.socket, "sendmsg")
3588@unittest.skipUnless(thread, 'Threading required for this test.')
3589class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
3590    pass
3591
3592@requireAttrs(socket.socket, "recvmsg")
3593@unittest.skipUnless(thread, 'Threading required for this test.')
3594class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
3595                     SendrecvmsgTCPTestBase):
3596    pass
3597
3598@requireAttrs(socket.socket, "recvmsg_into")
3599@unittest.skipUnless(thread, 'Threading required for this test.')
3600class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3601                         SendrecvmsgTCPTestBase):
3602    pass
3603
3604
3605class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
3606                                    SendrecvmsgConnectedBase,
3607                                    ConnectedStreamTestMixin, SCTPStreamBase):
3608    pass
3609
3610@requireAttrs(socket.socket, "sendmsg")
3611@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3612@unittest.skipUnless(thread, 'Threading required for this test.')
3613class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
3614    pass
3615
3616@requireAttrs(socket.socket, "recvmsg")
3617@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3618@unittest.skipUnless(thread, 'Threading required for this test.')
3619class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
3620                            SendrecvmsgSCTPStreamTestBase):
3621
3622    def testRecvmsgEOF(self):
3623        try:
3624            super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
3625        except OSError as e:
3626            if e.errno != errno.ENOTCONN:
3627                raise
3628            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
3629
3630@requireAttrs(socket.socket, "recvmsg_into")
3631@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3632@unittest.skipUnless(thread, 'Threading required for this test.')
3633class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3634                                SendrecvmsgSCTPStreamTestBase):
3635
3636    def testRecvmsgEOF(self):
3637        try:
3638            super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
3639        except OSError as e:
3640            if e.errno != errno.ENOTCONN:
3641                raise
3642            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
3643
3644
3645class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
3646                                    ConnectedStreamTestMixin, UnixStreamBase):
3647    pass
3648
3649@requireAttrs(socket.socket, "sendmsg")
3650@requireAttrs(socket, "AF_UNIX")
3651@unittest.skipUnless(thread, 'Threading required for this test.')
3652class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
3653    pass
3654
3655@requireAttrs(socket.socket, "recvmsg")
3656@requireAttrs(socket, "AF_UNIX")
3657@unittest.skipUnless(thread, 'Threading required for this test.')
3658class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
3659                            SendrecvmsgUnixStreamTestBase):
3660    pass
3661
3662@requireAttrs(socket.socket, "recvmsg_into")
3663@requireAttrs(socket, "AF_UNIX")
3664@unittest.skipUnless(thread, 'Threading required for this test.')
3665class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3666                                SendrecvmsgUnixStreamTestBase):
3667    pass
3668
3669@requireAttrs(socket.socket, "sendmsg", "recvmsg")
3670@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
3671@unittest.skipUnless(thread, 'Threading required for this test.')
3672class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
3673    pass
3674
3675@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
3676@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
3677@unittest.skipUnless(thread, 'Threading required for this test.')
3678class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
3679                                     SendrecvmsgUnixStreamTestBase):
3680    pass
3681
3682
3683# Test interrupting the interruptible send/receive methods with a
3684# signal when a timeout is set.  These tests avoid having multiple
3685# threads alive during the test so that the OS cannot deliver the
3686# signal to the wrong one.
3687
3688class InterruptedTimeoutBase(unittest.TestCase):
3689    # Base class for interrupted send/receive tests.  Installs an
3690    # empty handler for SIGALRM and removes it on teardown, along with
3691    # any scheduled alarms.
3692
3693    def setUp(self):
3694        super().setUp()
3695        orig_alrm_handler = signal.signal(signal.SIGALRM,
3696                                          lambda signum, frame: 1 / 0)
3697        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
3698
3699    # Timeout for socket operations
3700    timeout = 4.0
3701
3702    # Provide setAlarm() method to schedule delivery of SIGALRM after
3703    # given number of seconds, or cancel it if zero, and an
3704    # appropriate time value to use.  Use setitimer() if available.
3705    if hasattr(signal, "setitimer"):
3706        alarm_time = 0.05
3707
3708        def setAlarm(self, seconds):
3709            signal.setitimer(signal.ITIMER_REAL, seconds)
3710    else:
3711        # Old systems may deliver the alarm up to one second early
3712        alarm_time = 2
3713
3714        def setAlarm(self, seconds):
3715            signal.alarm(seconds)
3716
3717
3718# Require siginterrupt() in order to ensure that system calls are
3719# interrupted by default.
3720@requireAttrs(signal, "siginterrupt")
3721@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
3722                     "Don't have signal.alarm or signal.setitimer")
3723class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
3724    # Test interrupting the recv*() methods with signals when a
3725    # timeout is set.
3726
3727    def setUp(self):
3728        super().setUp()
3729        self.serv.settimeout(self.timeout)
3730
3731    def checkInterruptedRecv(self, func, *args, **kwargs):
3732        # Check that func(*args, **kwargs) raises
3733        # errno of EINTR when interrupted by a signal.
3734        try:
3735            self.setAlarm(self.alarm_time)
3736            with self.assertRaises(ZeroDivisionError) as cm:
3737                func(*args, **kwargs)
3738        finally:
3739            self.setAlarm(0)
3740
3741    def testInterruptedRecvTimeout(self):
3742        self.checkInterruptedRecv(self.serv.recv, 1024)
3743
3744    def testInterruptedRecvIntoTimeout(self):
3745        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
3746
3747    def testInterruptedRecvfromTimeout(self):
3748        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
3749
3750    def testInterruptedRecvfromIntoTimeout(self):
3751        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
3752
3753    @requireAttrs(socket.socket, "recvmsg")
3754    def testInterruptedRecvmsgTimeout(self):
3755        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
3756
3757    @requireAttrs(socket.socket, "recvmsg_into")
3758    def testInterruptedRecvmsgIntoTimeout(self):
3759        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
3760
3761
3762# Require siginterrupt() in order to ensure that system calls are
3763# interrupted by default.
3764@requireAttrs(signal, "siginterrupt")
3765@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
3766                     "Don't have signal.alarm or signal.setitimer")
3767@unittest.skipUnless(thread, 'Threading required for this test.')
3768class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
3769                                 ThreadSafeCleanupTestCase,
3770                                 SocketListeningTestMixin, TCPTestBase):
3771    # Test interrupting the interruptible send*() methods with signals
3772    # when a timeout is set.
3773
3774    def setUp(self):
3775        super().setUp()
3776        self.serv_conn = self.newSocket()
3777        self.addCleanup(self.serv_conn.close)
3778        # Use a thread to complete the connection, but wait for it to
3779        # terminate before running the test, so that there is only one
3780        # thread to accept the signal.
3781        cli_thread = threading.Thread(target=self.doConnect)
3782        cli_thread.start()
3783        self.cli_conn, addr = self.serv.accept()
3784        self.addCleanup(self.cli_conn.close)
3785        cli_thread.join()
3786        self.serv_conn.settimeout(self.timeout)
3787
3788    def doConnect(self):
3789        self.serv_conn.connect(self.serv_addr)
3790
3791    def checkInterruptedSend(self, func, *args, **kwargs):
3792        # Check that func(*args, **kwargs), run in a loop, raises
3793        # OSError with an errno of EINTR when interrupted by a
3794        # signal.
3795        try:
3796            with self.assertRaises(ZeroDivisionError) as cm:
3797                while True:
3798                    self.setAlarm(self.alarm_time)
3799                    func(*args, **kwargs)
3800        finally:
3801            self.setAlarm(0)
3802
3803    # Issue #12958: The following tests have problems on OS X prior to 10.7
3804    @support.requires_mac_ver(10, 7)
3805    def testInterruptedSendTimeout(self):
3806        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
3807
3808    @support.requires_mac_ver(10, 7)
3809    def testInterruptedSendtoTimeout(self):
3810        # Passing an actual address here as Python's wrapper for
3811        # sendto() doesn't allow passing a zero-length one; POSIX
3812        # requires that the address is ignored since the socket is
3813        # connection-mode, however.
3814        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
3815                                  self.serv_addr)
3816
3817    @support.requires_mac_ver(10, 7)
3818    @requireAttrs(socket.socket, "sendmsg")
3819    def testInterruptedSendmsgTimeout(self):
3820        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
3821
3822
3823@unittest.skipUnless(thread, 'Threading required for this test.')
3824class TCPCloserTest(ThreadedTCPSocketTest):
3825
3826    def testClose(self):
3827        conn, addr = self.serv.accept()
3828        conn.close()
3829
3830        sd = self.cli
3831        read, write, err = select.select([sd], [], [], 1.0)
3832        self.assertEqual(read, [sd])
3833        self.assertEqual(sd.recv(1), b'')
3834
3835        # Calling close() many times should be safe.
3836        conn.close()
3837        conn.close()
3838
3839    def _testClose(self):
3840        self.cli.connect((HOST, self.port))
3841        time.sleep(1.0)
3842
3843@unittest.skipUnless(thread, 'Threading required for this test.')
3844class BasicSocketPairTest(SocketPairTest):
3845
3846    def __init__(self, methodName='runTest'):
3847        SocketPairTest.__init__(self, methodName=methodName)
3848
3849    def _check_defaults(self, sock):
3850        self.assertIsInstance(sock, socket.socket)
3851        if hasattr(socket, 'AF_UNIX'):
3852            self.assertEqual(sock.family, socket.AF_UNIX)
3853        else:
3854            self.assertEqual(sock.family, socket.AF_INET)
3855        self.assertEqual(sock.type, socket.SOCK_STREAM)
3856        self.assertEqual(sock.proto, 0)
3857
3858    def _testDefaults(self):
3859        self._check_defaults(self.cli)
3860
3861    def testDefaults(self):
3862        self._check_defaults(self.serv)
3863
3864    def testRecv(self):
3865        msg = self.serv.recv(1024)
3866        self.assertEqual(msg, MSG)
3867
3868    def _testRecv(self):
3869        self.cli.send(MSG)
3870
3871    def testSend(self):
3872        self.serv.send(MSG)
3873
3874    def _testSend(self):
3875        msg = self.cli.recv(1024)
3876        self.assertEqual(msg, MSG)
3877
3878@unittest.skipUnless(thread, 'Threading required for this test.')
3879class NonBlockingTCPTests(ThreadedTCPSocketTest):
3880
3881    def __init__(self, methodName='runTest'):
3882        self.event = threading.Event()
3883        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
3884
3885    def testSetBlocking(self):
3886        # Testing whether set blocking works
3887        self.serv.setblocking(True)
3888        self.assertIsNone(self.serv.gettimeout())
3889        self.serv.setblocking(False)
3890        self.assertEqual(self.serv.gettimeout(), 0.0)
3891        start = time.time()
3892        try:
3893            self.serv.accept()
3894        except OSError:
3895            pass
3896        end = time.time()
3897        self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
3898
3899    def _testSetBlocking(self):
3900        pass
3901
3902    @support.cpython_only
3903    def testSetBlocking_overflow(self):
3904        # Issue 15989
3905        import _testcapi
3906        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
3907            self.skipTest('needs UINT_MAX < ULONG_MAX')
3908        self.serv.setblocking(False)
3909        self.assertEqual(self.serv.gettimeout(), 0.0)
3910        self.serv.setblocking(_testcapi.UINT_MAX + 1)
3911        self.assertIsNone(self.serv.gettimeout())
3912
3913    _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
3914
3915    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
3916                         'test needs socket.SOCK_NONBLOCK')
3917    @support.requires_linux_version(2, 6, 28)
3918    def testInitNonBlocking(self):
3919        # reinit server socket
3920        self.serv.close()
3921        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM |
3922                                                  socket.SOCK_NONBLOCK)
3923        self.port = support.bind_port(self.serv)
3924        self.serv.listen()
3925        # actual testing
3926        start = time.time()
3927        try:
3928            self.serv.accept()
3929        except OSError:
3930            pass
3931        end = time.time()
3932        self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
3933
3934    def _testInitNonBlocking(self):
3935        pass
3936
3937    def testInheritFlags(self):
3938        # Issue #7995: when calling accept() on a listening socket with a
3939        # timeout, the resulting socket should not be non-blocking.
3940        self.serv.settimeout(10)
3941        try:
3942            conn, addr = self.serv.accept()
3943            message = conn.recv(len(MSG))
3944        finally:
3945            conn.close()
3946            self.serv.settimeout(None)
3947
3948    def _testInheritFlags(self):
3949        time.sleep(0.1)
3950        self.cli.connect((HOST, self.port))
3951        time.sleep(0.5)
3952        self.cli.send(MSG)
3953
3954    def testAccept(self):
3955        # Testing non-blocking accept
3956        self.serv.setblocking(0)
3957
3958        # connect() didn't start: non-blocking accept() fails
3959        with self.assertRaises(BlockingIOError):
3960            conn, addr = self.serv.accept()
3961
3962        self.event.set()
3963
3964        read, write, err = select.select([self.serv], [], [], MAIN_TIMEOUT)
3965        if self.serv not in read:
3966            self.fail("Error trying to do accept after select.")
3967
3968        # connect() completed: non-blocking accept() doesn't block
3969        conn, addr = self.serv.accept()
3970        self.addCleanup(conn.close)
3971        self.assertIsNone(conn.gettimeout())
3972
3973    def _testAccept(self):
3974        # don't connect before event is set to check
3975        # that non-blocking accept() raises BlockingIOError
3976        self.event.wait()
3977
3978        self.cli.connect((HOST, self.port))
3979
3980    def testConnect(self):
3981        # Testing non-blocking connect
3982        conn, addr = self.serv.accept()
3983        conn.close()
3984
3985    def _testConnect(self):
3986        self.cli.settimeout(10)
3987        self.cli.connect((HOST, self.port))
3988
3989    def testRecv(self):
3990        # Testing non-blocking recv
3991        conn, addr = self.serv.accept()
3992        self.addCleanup(conn.close)
3993        conn.setblocking(0)
3994
3995        # the server didn't send data yet: non-blocking recv() fails
3996        with self.assertRaises(BlockingIOError):
3997            msg = conn.recv(len(MSG))
3998
3999        self.event.set()
4000
4001        read, write, err = select.select([conn], [], [], MAIN_TIMEOUT)
4002        if conn not in read:
4003            self.fail("Error during select call to non-blocking socket.")
4004
4005        # the server sent data yet: non-blocking recv() doesn't block
4006        msg = conn.recv(len(MSG))
4007        self.assertEqual(msg, MSG)
4008
4009    def _testRecv(self):
4010        self.cli.connect((HOST, self.port))
4011
4012        # don't send anything before event is set to check
4013        # that non-blocking recv() raises BlockingIOError
4014        self.event.wait()
4015
4016        # send data: recv() will no longer block
4017        self.cli.sendall(MSG)
4018
4019@unittest.skipUnless(thread, 'Threading required for this test.')
4020class FileObjectClassTestCase(SocketConnectedTest):
4021    """Unit tests for the object returned by socket.makefile()
4022
4023    self.read_file is the io object returned by makefile() on
4024    the client connection.  You can read from this file to
4025    get output from the server.
4026
4027    self.write_file is the io object returned by makefile() on the
4028    server connection.  You can write to this file to send output
4029    to the client.
4030    """
4031
4032    bufsize = -1 # Use default buffer size
4033    encoding = 'utf-8'
4034    errors = 'strict'
4035    newline = None
4036
4037    read_mode = 'rb'
4038    read_msg = MSG
4039    write_mode = 'wb'
4040    write_msg = MSG
4041
4042    def __init__(self, methodName='runTest'):
4043        SocketConnectedTest.__init__(self, methodName=methodName)
4044
4045    def setUp(self):
4046        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4047            threading.Event() for i in range(4)]
4048        SocketConnectedTest.setUp(self)
4049        self.read_file = self.cli_conn.makefile(
4050            self.read_mode, self.bufsize,
4051            encoding = self.encoding,
4052            errors = self.errors,
4053            newline = self.newline)
4054
4055    def tearDown(self):
4056        self.serv_finished.set()
4057        self.read_file.close()
4058        self.assertTrue(self.read_file.closed)
4059        self.read_file = None
4060        SocketConnectedTest.tearDown(self)
4061
4062    def clientSetUp(self):
4063        SocketConnectedTest.clientSetUp(self)
4064        self.write_file = self.serv_conn.makefile(
4065            self.write_mode, self.bufsize,
4066            encoding = self.encoding,
4067            errors = self.errors,
4068            newline = self.newline)
4069
4070    def clientTearDown(self):
4071        self.cli_finished.set()
4072        self.write_file.close()
4073        self.assertTrue(self.write_file.closed)
4074        self.write_file = None
4075        SocketConnectedTest.clientTearDown(self)
4076
4077    def testReadAfterTimeout(self):
4078        # Issue #7322: A file object must disallow further reads
4079        # after a timeout has occurred.
4080        self.cli_conn.settimeout(1)
4081        self.read_file.read(3)
4082        # First read raises a timeout
4083        self.assertRaises(socket.timeout, self.read_file.read, 1)
4084        # Second read is disallowed
4085        with self.assertRaises(OSError) as ctx:
4086            self.read_file.read(1)
4087        self.assertIn("cannot read from timed out object", str(ctx.exception))
4088
4089    def _testReadAfterTimeout(self):
4090        self.write_file.write(self.write_msg[0:3])
4091        self.write_file.flush()
4092        self.serv_finished.wait()
4093
4094    def testSmallRead(self):
4095        # Performing small file read test
4096        first_seg = self.read_file.read(len(self.read_msg)-3)
4097        second_seg = self.read_file.read(3)
4098        msg = first_seg + second_seg
4099        self.assertEqual(msg, self.read_msg)
4100
4101    def _testSmallRead(self):
4102        self.write_file.write(self.write_msg)
4103        self.write_file.flush()
4104
4105    def testFullRead(self):
4106        # read until EOF
4107        msg = self.read_file.read()
4108        self.assertEqual(msg, self.read_msg)
4109
4110    def _testFullRead(self):
4111        self.write_file.write(self.write_msg)
4112        self.write_file.close()
4113
4114    def testUnbufferedRead(self):
4115        # Performing unbuffered file read test
4116        buf = type(self.read_msg)()
4117        while 1:
4118            char = self.read_file.read(1)
4119            if not char:
4120                break
4121            buf += char
4122        self.assertEqual(buf, self.read_msg)
4123
4124    def _testUnbufferedRead(self):
4125        self.write_file.write(self.write_msg)
4126        self.write_file.flush()
4127
4128    def testReadline(self):
4129        # Performing file readline test
4130        line = self.read_file.readline()
4131        self.assertEqual(line, self.read_msg)
4132
4133    def _testReadline(self):
4134        self.write_file.write(self.write_msg)
4135        self.write_file.flush()
4136
4137    def testCloseAfterMakefile(self):
4138        # The file returned by makefile should keep the socket open.
4139        self.cli_conn.close()
4140        # read until EOF
4141        msg = self.read_file.read()
4142        self.assertEqual(msg, self.read_msg)
4143
4144    def _testCloseAfterMakefile(self):
4145        self.write_file.write(self.write_msg)
4146        self.write_file.flush()
4147
4148    def testMakefileAfterMakefileClose(self):
4149        self.read_file.close()
4150        msg = self.cli_conn.recv(len(MSG))
4151        if isinstance(self.read_msg, str):
4152            msg = msg.decode()
4153        self.assertEqual(msg, self.read_msg)
4154
4155    def _testMakefileAfterMakefileClose(self):
4156        self.write_file.write(self.write_msg)
4157        self.write_file.flush()
4158
4159    def testClosedAttr(self):
4160        self.assertTrue(not self.read_file.closed)
4161
4162    def _testClosedAttr(self):
4163        self.assertTrue(not self.write_file.closed)
4164
4165    def testAttributes(self):
4166        self.assertEqual(self.read_file.mode, self.read_mode)
4167        self.assertEqual(self.read_file.name, self.cli_conn.fileno())
4168
4169    def _testAttributes(self):
4170        self.assertEqual(self.write_file.mode, self.write_mode)
4171        self.assertEqual(self.write_file.name, self.serv_conn.fileno())
4172
4173    def testRealClose(self):
4174        self.read_file.close()
4175        self.assertRaises(ValueError, self.read_file.fileno)
4176        self.cli_conn.close()
4177        self.assertRaises(OSError, self.cli_conn.getsockname)
4178
4179    def _testRealClose(self):
4180        pass
4181
4182
4183class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
4184
4185    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
4186
4187    In this case (and in this case only), it should be possible to
4188    create a file object, read a line from it, create another file
4189    object, read another line from it, without loss of data in the
4190    first file object's buffer.  Note that http.client relies on this
4191    when reading multiple requests from the same socket."""
4192
4193    bufsize = 0 # Use unbuffered mode
4194
4195    def testUnbufferedReadline(self):
4196        # Read a line, create a new file object, read another line with it
4197        line = self.read_file.readline() # first line
4198        self.assertEqual(line, b"A. " + self.write_msg) # first line
4199        self.read_file = self.cli_conn.makefile('rb', 0)
4200        line = self.read_file.readline() # second line
4201        self.assertEqual(line, b"B. " + self.write_msg) # second line
4202
4203    def _testUnbufferedReadline(self):
4204        self.write_file.write(b"A. " + self.write_msg)
4205        self.write_file.write(b"B. " + self.write_msg)
4206        self.write_file.flush()
4207
4208    def testMakefileClose(self):
4209        # The file returned by makefile should keep the socket open...
4210        self.cli_conn.close()
4211        msg = self.cli_conn.recv(1024)
4212        self.assertEqual(msg, self.read_msg)
4213        # ...until the file is itself closed
4214        self.read_file.close()
4215        self.assertRaises(OSError, self.cli_conn.recv, 1024)
4216
4217    def _testMakefileClose(self):
4218        self.write_file.write(self.write_msg)
4219        self.write_file.flush()
4220
4221    @support.refcount_test
4222    def testMakefileCloseSocketDestroy(self):
4223        refcount_before = sys.getrefcount(self.cli_conn)
4224        self.read_file.close()
4225        refcount_after = sys.getrefcount(self.cli_conn)
4226        self.assertEqual(refcount_before - 1, refcount_after)
4227
4228    def _testMakefileCloseSocketDestroy(self):
4229        pass
4230
4231    # Non-blocking ops
4232    # NOTE: to set `read_file` as non-blocking, we must call
4233    # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
4234
4235    def testSmallReadNonBlocking(self):
4236        self.cli_conn.setblocking(False)
4237        self.assertEqual(self.read_file.readinto(bytearray(10)), None)
4238        self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
4239        self.evt1.set()
4240        self.evt2.wait(1.0)
4241        first_seg = self.read_file.read(len(self.read_msg) - 3)
4242        if first_seg is None:
4243            # Data not arrived (can happen under Windows), wait a bit
4244            time.sleep(0.5)
4245            first_seg = self.read_file.read(len(self.read_msg) - 3)
4246        buf = bytearray(10)
4247        n = self.read_file.readinto(buf)
4248        self.assertEqual(n, 3)
4249        msg = first_seg + buf[:n]
4250        self.assertEqual(msg, self.read_msg)
4251        self.assertEqual(self.read_file.readinto(bytearray(16)), None)
4252        self.assertEqual(self.read_file.read(1), None)
4253
4254    def _testSmallReadNonBlocking(self):
4255        self.evt1.wait(1.0)
4256        self.write_file.write(self.write_msg)
4257        self.write_file.flush()
4258        self.evt2.set()
4259        # Avoid closing the socket before the server test has finished,
4260        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
4261        self.serv_finished.wait(5.0)
4262
4263    def testWriteNonBlocking(self):
4264        self.cli_finished.wait(5.0)
4265        # The client thread can't skip directly - the SkipTest exception
4266        # would appear as a failure.
4267        if self.serv_skipped:
4268            self.skipTest(self.serv_skipped)
4269
4270    def _testWriteNonBlocking(self):
4271        self.serv_skipped = None
4272        self.serv_conn.setblocking(False)
4273        # Try to saturate the socket buffer pipe with repeated large writes.
4274        BIG = b"x" * support.SOCK_MAX_SIZE
4275        LIMIT = 10
4276        # The first write() succeeds since a chunk of data can be buffered
4277        n = self.write_file.write(BIG)
4278        self.assertGreater(n, 0)
4279        for i in range(LIMIT):
4280            n = self.write_file.write(BIG)
4281            if n is None:
4282                # Succeeded
4283                break
4284            self.assertGreater(n, 0)
4285        else:
4286            # Let us know that this test didn't manage to establish
4287            # the expected conditions. This is not a failure in itself but,
4288            # if it happens repeatedly, the test should be fixed.
4289            self.serv_skipped = "failed to saturate the socket buffer"
4290
4291
4292class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4293
4294    bufsize = 1 # Default-buffered for reading; line-buffered for writing
4295
4296
4297class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4298
4299    bufsize = 2 # Exercise the buffering code
4300
4301
4302class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
4303    """Tests for socket.makefile() in text mode (rather than binary)"""
4304
4305    read_mode = 'r'
4306    read_msg = MSG.decode('utf-8')
4307    write_mode = 'wb'
4308    write_msg = MSG
4309    newline = ''
4310
4311
4312class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
4313    """Tests for socket.makefile() in text mode (rather than binary)"""
4314
4315    read_mode = 'rb'
4316    read_msg = MSG
4317    write_mode = 'w'
4318    write_msg = MSG.decode('utf-8')
4319    newline = ''
4320
4321
4322class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
4323    """Tests for socket.makefile() in text mode (rather than binary)"""
4324
4325    read_mode = 'r'
4326    read_msg = MSG.decode('utf-8')
4327    write_mode = 'w'
4328    write_msg = MSG.decode('utf-8')
4329    newline = ''
4330
4331
4332class NetworkConnectionTest(object):
4333    """Prove network connection."""
4334
4335    def clientSetUp(self):
4336        # We're inherited below by BasicTCPTest2, which also inherits
4337        # BasicTCPTest, which defines self.port referenced below.
4338        self.cli = socket.create_connection((HOST, self.port))
4339        self.serv_conn = self.cli
4340
4341class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
4342    """Tests that NetworkConnection does not break existing TCP functionality.
4343    """
4344
4345class NetworkConnectionNoServer(unittest.TestCase):
4346
4347    class MockSocket(socket.socket):
4348        def connect(self, *args):
4349            raise socket.timeout('timed out')
4350
4351    @contextlib.contextmanager
4352    def mocked_socket_module(self):
4353        """Return a socket which times out on connect"""
4354        old_socket = socket.socket
4355        socket.socket = self.MockSocket
4356        try:
4357            yield
4358        finally:
4359            socket.socket = old_socket
4360
4361    def test_connect(self):
4362        port = support.find_unused_port()
4363        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4364        self.addCleanup(cli.close)
4365        with self.assertRaises(OSError) as cm:
4366            cli.connect((HOST, port))
4367        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
4368
4369    def test_create_connection(self):
4370        # Issue #9792: errors raised by create_connection() should have
4371        # a proper errno attribute.
4372        port = support.find_unused_port()
4373        with self.assertRaises(OSError) as cm:
4374            socket.create_connection((HOST, port))
4375
4376        # Issue #16257: create_connection() calls getaddrinfo() against
4377        # 'localhost'.  This may result in an IPV6 addr being returned
4378        # as well as an IPV4 one:
4379        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
4380        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
4381        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
4382        #
4383        # create_connection() enumerates through all the addresses returned
4384        # and if it doesn't successfully bind to any of them, it propagates
4385        # the last exception it encountered.
4386        #
4387        # On Solaris, ENETUNREACH is returned in this circumstance instead
4388        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
4389        # expected errnos.
4390        expected_errnos = [ errno.ECONNREFUSED, ]
4391        if hasattr(errno, 'ENETUNREACH'):
4392            expected_errnos.append(errno.ENETUNREACH)
4393        if hasattr(errno, 'EADDRNOTAVAIL'):
4394            # bpo-31910: socket.create_connection() fails randomly
4395            # with EADDRNOTAVAIL on Travis CI
4396            expected_errnos.append(errno.EADDRNOTAVAIL)
4397
4398        self.assertIn(cm.exception.errno, expected_errnos)
4399
4400    def test_create_connection_timeout(self):
4401        # Issue #9792: create_connection() should not recast timeout errors
4402        # as generic socket errors.
4403        with self.mocked_socket_module():
4404            with self.assertRaises(socket.timeout):
4405                socket.create_connection((HOST, 1234))
4406
4407
4408@unittest.skipUnless(thread, 'Threading required for this test.')
4409class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
4410
4411    def __init__(self, methodName='runTest'):
4412        SocketTCPTest.__init__(self, methodName=methodName)
4413        ThreadableTest.__init__(self)
4414
4415    def clientSetUp(self):
4416        self.source_port = support.find_unused_port()
4417
4418    def clientTearDown(self):
4419        self.cli.close()
4420        self.cli = None
4421        ThreadableTest.clientTearDown(self)
4422
4423    def _justAccept(self):
4424        conn, addr = self.serv.accept()
4425        conn.close()
4426
4427    testFamily = _justAccept
4428    def _testFamily(self):
4429        self.cli = socket.create_connection((HOST, self.port), timeout=30)
4430        self.addCleanup(self.cli.close)
4431        self.assertEqual(self.cli.family, 2)
4432
4433    testSourceAddress = _justAccept
4434    def _testSourceAddress(self):
4435        self.cli = socket.create_connection((HOST, self.port), timeout=30,
4436                source_address=('', self.source_port))
4437        self.addCleanup(self.cli.close)
4438        self.assertEqual(self.cli.getsockname()[1], self.source_port)
4439        # The port number being used is sufficient to show that the bind()
4440        # call happened.
4441
4442    testTimeoutDefault = _justAccept
4443    def _testTimeoutDefault(self):
4444        # passing no explicit timeout uses socket's global default
4445        self.assertTrue(socket.getdefaulttimeout() is None)
4446        socket.setdefaulttimeout(42)
4447        try:
4448            self.cli = socket.create_connection((HOST, self.port))
4449            self.addCleanup(self.cli.close)
4450        finally:
4451            socket.setdefaulttimeout(None)
4452        self.assertEqual(self.cli.gettimeout(), 42)
4453
4454    testTimeoutNone = _justAccept
4455    def _testTimeoutNone(self):
4456        # None timeout means the same as sock.settimeout(None)
4457        self.assertTrue(socket.getdefaulttimeout() is None)
4458        socket.setdefaulttimeout(30)
4459        try:
4460            self.cli = socket.create_connection((HOST, self.port), timeout=None)
4461            self.addCleanup(self.cli.close)
4462        finally:
4463            socket.setdefaulttimeout(None)
4464        self.assertEqual(self.cli.gettimeout(), None)
4465
4466    testTimeoutValueNamed = _justAccept
4467    def _testTimeoutValueNamed(self):
4468        self.cli = socket.create_connection((HOST, self.port), timeout=30)
4469        self.assertEqual(self.cli.gettimeout(), 30)
4470
4471    testTimeoutValueNonamed = _justAccept
4472    def _testTimeoutValueNonamed(self):
4473        self.cli = socket.create_connection((HOST, self.port), 30)
4474        self.addCleanup(self.cli.close)
4475        self.assertEqual(self.cli.gettimeout(), 30)
4476
4477@unittest.skipUnless(thread, 'Threading required for this test.')
4478class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
4479
4480    def __init__(self, methodName='runTest'):
4481        SocketTCPTest.__init__(self, methodName=methodName)
4482        ThreadableTest.__init__(self)
4483
4484    def clientSetUp(self):
4485        pass
4486
4487    def clientTearDown(self):
4488        self.cli.close()
4489        self.cli = None
4490        ThreadableTest.clientTearDown(self)
4491
4492    def testInsideTimeout(self):
4493        conn, addr = self.serv.accept()
4494        self.addCleanup(conn.close)
4495        time.sleep(3)
4496        conn.send(b"done!")
4497    testOutsideTimeout = testInsideTimeout
4498
4499    def _testInsideTimeout(self):
4500        self.cli = sock = socket.create_connection((HOST, self.port))
4501        data = sock.recv(5)
4502        self.assertEqual(data, b"done!")
4503
4504    def _testOutsideTimeout(self):
4505        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
4506        self.assertRaises(socket.timeout, lambda: sock.recv(5))
4507
4508
4509class TCPTimeoutTest(SocketTCPTest):
4510
4511    def testTCPTimeout(self):
4512        def raise_timeout(*args, **kwargs):
4513            self.serv.settimeout(1.0)
4514            self.serv.accept()
4515        self.assertRaises(socket.timeout, raise_timeout,
4516                              "Error generating a timeout exception (TCP)")
4517
4518    def testTimeoutZero(self):
4519        ok = False
4520        try:
4521            self.serv.settimeout(0.0)
4522            foo = self.serv.accept()
4523        except socket.timeout:
4524            self.fail("caught timeout instead of error (TCP)")
4525        except OSError:
4526            ok = True
4527        except:
4528            self.fail("caught unexpected exception (TCP)")
4529        if not ok:
4530            self.fail("accept() returned success when we did not expect it")
4531
4532    @unittest.skipUnless(hasattr(signal, 'alarm'),
4533                         'test needs signal.alarm()')
4534    def testInterruptedTimeout(self):
4535        # XXX I don't know how to do this test on MSWindows or any other
4536        # plaform that doesn't support signal.alarm() or os.kill(), though
4537        # the bug should have existed on all platforms.
4538        self.serv.settimeout(5.0)   # must be longer than alarm
4539        class Alarm(Exception):
4540            pass
4541        def alarm_handler(signal, frame):
4542            raise Alarm
4543        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
4544        try:
4545            try:
4546                signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
4547                foo = self.serv.accept()
4548            except socket.timeout:
4549                self.fail("caught timeout instead of Alarm")
4550            except Alarm:
4551                pass
4552            except:
4553                self.fail("caught other exception instead of Alarm:"
4554                          " %s(%s):\n%s" %
4555                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
4556            else:
4557                self.fail("nothing caught")
4558            finally:
4559                signal.alarm(0)         # shut off alarm
4560        except Alarm:
4561            self.fail("got Alarm in wrong place")
4562        finally:
4563            # no alarm can be pending.  Safe to restore old handler.
4564            signal.signal(signal.SIGALRM, old_alarm)
4565
4566class UDPTimeoutTest(SocketUDPTest):
4567
4568    def testUDPTimeout(self):
4569        def raise_timeout(*args, **kwargs):
4570            self.serv.settimeout(1.0)
4571            self.serv.recv(1024)
4572        self.assertRaises(socket.timeout, raise_timeout,
4573                              "Error generating a timeout exception (UDP)")
4574
4575    def testTimeoutZero(self):
4576        ok = False
4577        try:
4578            self.serv.settimeout(0.0)
4579            foo = self.serv.recv(1024)
4580        except socket.timeout:
4581            self.fail("caught timeout instead of error (UDP)")
4582        except OSError:
4583            ok = True
4584        except:
4585            self.fail("caught unexpected exception (UDP)")
4586        if not ok:
4587            self.fail("recv() returned success when we did not expect it")
4588
4589class TestExceptions(unittest.TestCase):
4590
4591    def testExceptionTree(self):
4592        self.assertTrue(issubclass(OSError, Exception))
4593        self.assertTrue(issubclass(socket.herror, OSError))
4594        self.assertTrue(issubclass(socket.gaierror, OSError))
4595        self.assertTrue(issubclass(socket.timeout, OSError))
4596
4597    def test_setblocking_invalidfd(self):
4598        # Regression test for issue #28471
4599
4600        sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
4601        sock = socket.socket(
4602            socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
4603        sock0.close()
4604        self.addCleanup(sock.detach)
4605
4606        with self.assertRaises(OSError):
4607            sock.setblocking(False)
4608
4609
4610@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
4611class TestLinuxAbstractNamespace(unittest.TestCase):
4612
4613    UNIX_PATH_MAX = 108
4614
4615    def testLinuxAbstractNamespace(self):
4616        address = b"\x00python-test-hello\x00\xff"
4617        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
4618            s1.bind(address)
4619            s1.listen()
4620            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
4621                s2.connect(s1.getsockname())
4622                with s1.accept()[0] as s3:
4623                    self.assertEqual(s1.getsockname(), address)
4624                    self.assertEqual(s2.getpeername(), address)
4625
4626    def testMaxName(self):
4627        address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
4628        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4629            s.bind(address)
4630            self.assertEqual(s.getsockname(), address)
4631
4632    def testNameOverflow(self):
4633        address = "\x00" + "h" * self.UNIX_PATH_MAX
4634        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4635            self.assertRaises(OSError, s.bind, address)
4636
4637    def testStrName(self):
4638        # Check that an abstract name can be passed as a string.
4639        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
4640        try:
4641            s.bind("\x00python\x00test\x00")
4642            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
4643        finally:
4644            s.close()
4645
4646    def testBytearrayName(self):
4647        # Check that an abstract name can be passed as a bytearray.
4648        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4649            s.bind(bytearray(b"\x00python\x00test\x00"))
4650            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
4651
4652@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
4653class TestUnixDomain(unittest.TestCase):
4654
4655    def setUp(self):
4656        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
4657
4658    def tearDown(self):
4659        self.sock.close()
4660
4661    def encoded(self, path):
4662        # Return the given path encoded in the file system encoding,
4663        # or skip the test if this is not possible.
4664        try:
4665            return os.fsencode(path)
4666        except UnicodeEncodeError:
4667            self.skipTest(
4668                "Pathname {0!a} cannot be represented in file "
4669                "system encoding {1!r}".format(
4670                    path, sys.getfilesystemencoding()))
4671
4672    def bind(self, sock, path):
4673        # Bind the socket
4674        try:
4675            support.bind_unix_socket(sock, path)
4676        except OSError as e:
4677            if str(e) == "AF_UNIX path too long":
4678                self.skipTest(
4679                    "Pathname {0!a} is too long to serve as an AF_UNIX path"
4680                    .format(path))
4681            else:
4682                raise
4683
4684    def testUnbound(self):
4685        # Issue #30205
4686        self.assertIn(self.sock.getsockname(), ('', None))
4687
4688    def testStrAddr(self):
4689        # Test binding to and retrieving a normal string pathname.
4690        path = os.path.abspath(support.TESTFN)
4691        self.bind(self.sock, path)
4692        self.addCleanup(support.unlink, path)
4693        self.assertEqual(self.sock.getsockname(), path)
4694
4695    def testBytesAddr(self):
4696        # Test binding to a bytes pathname.
4697        path = os.path.abspath(support.TESTFN)
4698        self.bind(self.sock, self.encoded(path))
4699        self.addCleanup(support.unlink, path)
4700        self.assertEqual(self.sock.getsockname(), path)
4701
4702    def testSurrogateescapeBind(self):
4703        # Test binding to a valid non-ASCII pathname, with the
4704        # non-ASCII bytes supplied using surrogateescape encoding.
4705        path = os.path.abspath(support.TESTFN_UNICODE)
4706        b = self.encoded(path)
4707        self.bind(self.sock, b.decode("ascii", "surrogateescape"))
4708        self.addCleanup(support.unlink, path)
4709        self.assertEqual(self.sock.getsockname(), path)
4710
4711    def testUnencodableAddr(self):
4712        # Test binding to a pathname that cannot be encoded in the
4713        # file system encoding.
4714        if support.TESTFN_UNENCODABLE is None:
4715            self.skipTest("No unencodable filename available")
4716        path = os.path.abspath(support.TESTFN_UNENCODABLE)
4717        self.bind(self.sock, path)
4718        self.addCleanup(support.unlink, path)
4719        self.assertEqual(self.sock.getsockname(), path)
4720
4721@unittest.skipUnless(thread, 'Threading required for this test.')
4722class BufferIOTest(SocketConnectedTest):
4723    """
4724    Test the buffer versions of socket.recv() and socket.send().
4725    """
4726    def __init__(self, methodName='runTest'):
4727        SocketConnectedTest.__init__(self, methodName=methodName)
4728
4729    def testRecvIntoArray(self):
4730        buf = array.array("B", [0] * len(MSG))
4731        nbytes = self.cli_conn.recv_into(buf)
4732        self.assertEqual(nbytes, len(MSG))
4733        buf = buf.tobytes()
4734        msg = buf[:len(MSG)]
4735        self.assertEqual(msg, MSG)
4736
4737    def _testRecvIntoArray(self):
4738        buf = bytes(MSG)
4739        self.serv_conn.send(buf)
4740
4741    def testRecvIntoBytearray(self):
4742        buf = bytearray(1024)
4743        nbytes = self.cli_conn.recv_into(buf)
4744        self.assertEqual(nbytes, len(MSG))
4745        msg = buf[:len(MSG)]
4746        self.assertEqual(msg, MSG)
4747
4748    _testRecvIntoBytearray = _testRecvIntoArray
4749
4750    def testRecvIntoMemoryview(self):
4751        buf = bytearray(1024)
4752        nbytes = self.cli_conn.recv_into(memoryview(buf))
4753        self.assertEqual(nbytes, len(MSG))
4754        msg = buf[:len(MSG)]
4755        self.assertEqual(msg, MSG)
4756
4757    _testRecvIntoMemoryview = _testRecvIntoArray
4758
4759    def testRecvFromIntoArray(self):
4760        buf = array.array("B", [0] * len(MSG))
4761        nbytes, addr = self.cli_conn.recvfrom_into(buf)
4762        self.assertEqual(nbytes, len(MSG))
4763        buf = buf.tobytes()
4764        msg = buf[:len(MSG)]
4765        self.assertEqual(msg, MSG)
4766
4767    def _testRecvFromIntoArray(self):
4768        buf = bytes(MSG)
4769        self.serv_conn.send(buf)
4770
4771    def testRecvFromIntoBytearray(self):
4772        buf = bytearray(1024)
4773        nbytes, addr = self.cli_conn.recvfrom_into(buf)
4774        self.assertEqual(nbytes, len(MSG))
4775        msg = buf[:len(MSG)]
4776        self.assertEqual(msg, MSG)
4777
4778    _testRecvFromIntoBytearray = _testRecvFromIntoArray
4779
4780    def testRecvFromIntoMemoryview(self):
4781        buf = bytearray(1024)
4782        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
4783        self.assertEqual(nbytes, len(MSG))
4784        msg = buf[:len(MSG)]
4785        self.assertEqual(msg, MSG)
4786
4787    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
4788
4789    def testRecvFromIntoSmallBuffer(self):
4790        # See issue #20246.
4791        buf = bytearray(8)
4792        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
4793
4794    def _testRecvFromIntoSmallBuffer(self):
4795        self.serv_conn.send(MSG)
4796
4797    def testRecvFromIntoEmptyBuffer(self):
4798        buf = bytearray()
4799        self.cli_conn.recvfrom_into(buf)
4800        self.cli_conn.recvfrom_into(buf, 0)
4801
4802    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
4803
4804
4805TIPC_STYPE = 2000
4806TIPC_LOWER = 200
4807TIPC_UPPER = 210
4808
4809def isTipcAvailable():
4810    """Check if the TIPC module is loaded
4811
4812    The TIPC module is not loaded automatically on Ubuntu and probably
4813    other Linux distros.
4814    """
4815    if not hasattr(socket, "AF_TIPC"):
4816        return False
4817    try:
4818        f = open("/proc/modules")
4819    except (FileNotFoundError, IsADirectoryError, PermissionError):
4820        # It's ok if the file does not exist, is a directory or if we
4821        # have not the permission to read it.
4822        return False
4823    with f:
4824        for line in f:
4825            if line.startswith("tipc "):
4826                return True
4827    return False
4828
4829@unittest.skipUnless(isTipcAvailable(),
4830                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
4831class TIPCTest(unittest.TestCase):
4832    def testRDM(self):
4833        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
4834        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
4835        self.addCleanup(srv.close)
4836        self.addCleanup(cli.close)
4837
4838        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
4839        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
4840                TIPC_LOWER, TIPC_UPPER)
4841        srv.bind(srvaddr)
4842
4843        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
4844                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
4845        cli.sendto(MSG, sendaddr)
4846
4847        msg, recvaddr = srv.recvfrom(1024)
4848
4849        self.assertEqual(cli.getsockname(), recvaddr)
4850        self.assertEqual(msg, MSG)
4851
4852
4853@unittest.skipUnless(isTipcAvailable(),
4854                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
4855class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
4856    def __init__(self, methodName = 'runTest'):
4857        unittest.TestCase.__init__(self, methodName = methodName)
4858        ThreadableTest.__init__(self)
4859
4860    def setUp(self):
4861        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
4862        self.addCleanup(self.srv.close)
4863        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
4864        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
4865                TIPC_LOWER, TIPC_UPPER)
4866        self.srv.bind(srvaddr)
4867        self.srv.listen()
4868        self.serverExplicitReady()
4869        self.conn, self.connaddr = self.srv.accept()
4870        self.addCleanup(self.conn.close)
4871
4872    def clientSetUp(self):
4873        # There is a hittable race between serverExplicitReady() and the
4874        # accept() call; sleep a little while to avoid it, otherwise
4875        # we could get an exception
4876        time.sleep(0.1)
4877        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
4878        self.addCleanup(self.cli.close)
4879        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
4880                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
4881        self.cli.connect(addr)
4882        self.cliaddr = self.cli.getsockname()
4883
4884    def testStream(self):
4885        msg = self.conn.recv(1024)
4886        self.assertEqual(msg, MSG)
4887        self.assertEqual(self.cliaddr, self.connaddr)
4888
4889    def _testStream(self):
4890        self.cli.send(MSG)
4891        self.cli.close()
4892
4893
4894@unittest.skipUnless(thread, 'Threading required for this test.')
4895class ContextManagersTest(ThreadedTCPSocketTest):
4896
4897    def _testSocketClass(self):
4898        # base test
4899        with socket.socket() as sock:
4900            self.assertFalse(sock._closed)
4901        self.assertTrue(sock._closed)
4902        # close inside with block
4903        with socket.socket() as sock:
4904            sock.close()
4905        self.assertTrue(sock._closed)
4906        # exception inside with block
4907        with socket.socket() as sock:
4908            self.assertRaises(OSError, sock.sendall, b'foo')
4909        self.assertTrue(sock._closed)
4910
4911    def testCreateConnectionBase(self):
4912        conn, addr = self.serv.accept()
4913        self.addCleanup(conn.close)
4914        data = conn.recv(1024)
4915        conn.sendall(data)
4916
4917    def _testCreateConnectionBase(self):
4918        address = self.serv.getsockname()
4919        with socket.create_connection(address) as sock:
4920            self.assertFalse(sock._closed)
4921            sock.sendall(b'foo')
4922            self.assertEqual(sock.recv(1024), b'foo')
4923        self.assertTrue(sock._closed)
4924
4925    def testCreateConnectionClose(self):
4926        conn, addr = self.serv.accept()
4927        self.addCleanup(conn.close)
4928        data = conn.recv(1024)
4929        conn.sendall(data)
4930
4931    def _testCreateConnectionClose(self):
4932        address = self.serv.getsockname()
4933        with socket.create_connection(address) as sock:
4934            sock.close()
4935        self.assertTrue(sock._closed)
4936        self.assertRaises(OSError, sock.sendall, b'foo')
4937
4938
4939class InheritanceTest(unittest.TestCase):
4940    @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
4941                         "SOCK_CLOEXEC not defined")
4942    @support.requires_linux_version(2, 6, 28)
4943    def test_SOCK_CLOEXEC(self):
4944        with socket.socket(socket.AF_INET,
4945                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
4946            self.assertTrue(s.type & socket.SOCK_CLOEXEC)
4947            self.assertFalse(s.get_inheritable())
4948
4949    def test_default_inheritable(self):
4950        sock = socket.socket()
4951        with sock:
4952            self.assertEqual(sock.get_inheritable(), False)
4953
4954    def test_dup(self):
4955        sock = socket.socket()
4956        with sock:
4957            newsock = sock.dup()
4958            sock.close()
4959            with newsock:
4960                self.assertEqual(newsock.get_inheritable(), False)
4961
4962    def test_set_inheritable(self):
4963        sock = socket.socket()
4964        with sock:
4965            sock.set_inheritable(True)
4966            self.assertEqual(sock.get_inheritable(), True)
4967
4968            sock.set_inheritable(False)
4969            self.assertEqual(sock.get_inheritable(), False)
4970
4971    @unittest.skipIf(fcntl is None, "need fcntl")
4972    def test_get_inheritable_cloexec(self):
4973        sock = socket.socket()
4974        with sock:
4975            fd = sock.fileno()
4976            self.assertEqual(sock.get_inheritable(), False)
4977
4978            # clear FD_CLOEXEC flag
4979            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
4980            flags &= ~fcntl.FD_CLOEXEC
4981            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
4982
4983            self.assertEqual(sock.get_inheritable(), True)
4984
4985    @unittest.skipIf(fcntl is None, "need fcntl")
4986    def test_set_inheritable_cloexec(self):
4987        sock = socket.socket()
4988        with sock:
4989            fd = sock.fileno()
4990            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
4991                             fcntl.FD_CLOEXEC)
4992
4993            sock.set_inheritable(True)
4994            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
4995                             0)
4996
4997
4998    @unittest.skipUnless(hasattr(socket, "socketpair"),
4999                         "need socket.socketpair()")
5000    def test_socketpair(self):
5001        s1, s2 = socket.socketpair()
5002        self.addCleanup(s1.close)
5003        self.addCleanup(s2.close)
5004        self.assertEqual(s1.get_inheritable(), False)
5005        self.assertEqual(s2.get_inheritable(), False)
5006
5007
5008@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
5009                     "SOCK_NONBLOCK not defined")
5010class NonblockConstantTest(unittest.TestCase):
5011    def checkNonblock(self, s, nonblock=True, timeout=0.0):
5012        if nonblock:
5013            self.assertTrue(s.type & socket.SOCK_NONBLOCK)
5014            self.assertEqual(s.gettimeout(), timeout)
5015        else:
5016            self.assertFalse(s.type & socket.SOCK_NONBLOCK)
5017            self.assertEqual(s.gettimeout(), None)
5018
5019    @support.requires_linux_version(2, 6, 28)
5020    def test_SOCK_NONBLOCK(self):
5021        # a lot of it seems silly and redundant, but I wanted to test that
5022        # changing back and forth worked ok
5023        with socket.socket(socket.AF_INET,
5024                           socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
5025            self.checkNonblock(s)
5026            s.setblocking(1)
5027            self.checkNonblock(s, False)
5028            s.setblocking(0)
5029            self.checkNonblock(s)
5030            s.settimeout(None)
5031            self.checkNonblock(s, False)
5032            s.settimeout(2.0)
5033            self.checkNonblock(s, timeout=2.0)
5034            s.setblocking(1)
5035            self.checkNonblock(s, False)
5036        # defaulttimeout
5037        t = socket.getdefaulttimeout()
5038        socket.setdefaulttimeout(0.0)
5039        with socket.socket() as s:
5040            self.checkNonblock(s)
5041        socket.setdefaulttimeout(None)
5042        with socket.socket() as s:
5043            self.checkNonblock(s, False)
5044        socket.setdefaulttimeout(2.0)
5045        with socket.socket() as s:
5046            self.checkNonblock(s, timeout=2.0)
5047        socket.setdefaulttimeout(None)
5048        with socket.socket() as s:
5049            self.checkNonblock(s, False)
5050        socket.setdefaulttimeout(t)
5051
5052
5053@unittest.skipUnless(os.name == "nt", "Windows specific")
5054@unittest.skipUnless(multiprocessing, "need multiprocessing")
5055class TestSocketSharing(SocketTCPTest):
5056    # This must be classmethod and not staticmethod or multiprocessing
5057    # won't be able to bootstrap it.
5058    @classmethod
5059    def remoteProcessServer(cls, q):
5060        # Recreate socket from shared data
5061        sdata = q.get()
5062        message = q.get()
5063
5064        s = socket.fromshare(sdata)
5065        s2, c = s.accept()
5066
5067        # Send the message
5068        s2.sendall(message)
5069        s2.close()
5070        s.close()
5071
5072    def testShare(self):
5073        # Transfer the listening server socket to another process
5074        # and service it from there.
5075
5076        # Create process:
5077        q = multiprocessing.Queue()
5078        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
5079        p.start()
5080
5081        # Get the shared socket data
5082        data = self.serv.share(p.pid)
5083
5084        # Pass the shared socket to the other process
5085        addr = self.serv.getsockname()
5086        self.serv.close()
5087        q.put(data)
5088
5089        # The data that the server will send us
5090        message = b"slapmahfro"
5091        q.put(message)
5092
5093        # Connect
5094        s = socket.create_connection(addr)
5095        #  listen for the data
5096        m = []
5097        while True:
5098            data = s.recv(100)
5099            if not data:
5100                break
5101            m.append(data)
5102        s.close()
5103        received = b"".join(m)
5104        self.assertEqual(received, message)
5105        p.join()
5106
5107    def testShareLength(self):
5108        data = self.serv.share(os.getpid())
5109        self.assertRaises(ValueError, socket.fromshare, data[:-1])
5110        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
5111
5112    def compareSockets(self, org, other):
5113        # socket sharing is expected to work only for blocking socket
5114        # since the internal python timeout value isn't transferred.
5115        self.assertEqual(org.gettimeout(), None)
5116        self.assertEqual(org.gettimeout(), other.gettimeout())
5117
5118        self.assertEqual(org.family, other.family)
5119        self.assertEqual(org.type, other.type)
5120        # If the user specified "0" for proto, then
5121        # internally windows will have picked the correct value.
5122        # Python introspection on the socket however will still return
5123        # 0.  For the shared socket, the python value is recreated
5124        # from the actual value, so it may not compare correctly.
5125        if org.proto != 0:
5126            self.assertEqual(org.proto, other.proto)
5127
5128    def testShareLocal(self):
5129        data = self.serv.share(os.getpid())
5130        s = socket.fromshare(data)
5131        try:
5132            self.compareSockets(self.serv, s)
5133        finally:
5134            s.close()
5135
5136    def testTypes(self):
5137        families = [socket.AF_INET, socket.AF_INET6]
5138        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
5139        for f in families:
5140            for t in types:
5141                try:
5142                    source = socket.socket(f, t)
5143                except OSError:
5144                    continue # This combination is not supported
5145                try:
5146                    data = source.share(os.getpid())
5147                    shared = socket.fromshare(data)
5148                    try:
5149                        self.compareSockets(source, shared)
5150                    finally:
5151                        shared.close()
5152                finally:
5153                    source.close()
5154
5155
5156@unittest.skipUnless(thread, 'Threading required for this test.')
5157class SendfileUsingSendTest(ThreadedTCPSocketTest):
5158    """
5159    Test the send() implementation of socket.sendfile().
5160    """
5161
5162    FILESIZE = (10 * 1024 * 1024)  # 10MB
5163    BUFSIZE = 8192
5164    FILEDATA = b""
5165    TIMEOUT = 2
5166
5167    @classmethod
5168    def setUpClass(cls):
5169        def chunks(total, step):
5170            assert total >= step
5171            while total > step:
5172                yield step
5173                total -= step
5174            if total:
5175                yield total
5176
5177        chunk = b"".join([random.choice(string.ascii_letters).encode()
5178                          for i in range(cls.BUFSIZE)])
5179        with open(support.TESTFN, 'wb') as f:
5180            for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
5181                f.write(chunk)
5182        with open(support.TESTFN, 'rb') as f:
5183            cls.FILEDATA = f.read()
5184            assert len(cls.FILEDATA) == cls.FILESIZE
5185
5186    @classmethod
5187    def tearDownClass(cls):
5188        support.unlink(support.TESTFN)
5189
5190    def accept_conn(self):
5191        self.serv.settimeout(self.TIMEOUT)
5192        conn, addr = self.serv.accept()
5193        conn.settimeout(self.TIMEOUT)
5194        self.addCleanup(conn.close)
5195        return conn
5196
5197    def recv_data(self, conn):
5198        received = []
5199        while True:
5200            chunk = conn.recv(self.BUFSIZE)
5201            if not chunk:
5202                break
5203            received.append(chunk)
5204        return b''.join(received)
5205
5206    def meth_from_sock(self, sock):
5207        # Depending on the mixin class being run return either send()
5208        # or sendfile() method implementation.
5209        return getattr(sock, "_sendfile_use_send")
5210
5211    # regular file
5212
5213    def _testRegularFile(self):
5214        address = self.serv.getsockname()
5215        file = open(support.TESTFN, 'rb')
5216        with socket.create_connection(address) as sock, file as file:
5217            meth = self.meth_from_sock(sock)
5218            sent = meth(file)
5219            self.assertEqual(sent, self.FILESIZE)
5220            self.assertEqual(file.tell(), self.FILESIZE)
5221
5222    def testRegularFile(self):
5223        conn = self.accept_conn()
5224        data = self.recv_data(conn)
5225        self.assertEqual(len(data), self.FILESIZE)
5226        self.assertEqual(data, self.FILEDATA)
5227
5228    # non regular file
5229
5230    def _testNonRegularFile(self):
5231        address = self.serv.getsockname()
5232        file = io.BytesIO(self.FILEDATA)
5233        with socket.create_connection(address) as sock, file as file:
5234            sent = sock.sendfile(file)
5235            self.assertEqual(sent, self.FILESIZE)
5236            self.assertEqual(file.tell(), self.FILESIZE)
5237            self.assertRaises(socket._GiveupOnSendfile,
5238                              sock._sendfile_use_sendfile, file)
5239
5240    def testNonRegularFile(self):
5241        conn = self.accept_conn()
5242        data = self.recv_data(conn)
5243        self.assertEqual(len(data), self.FILESIZE)
5244        self.assertEqual(data, self.FILEDATA)
5245
5246    # empty file
5247
5248    def _testEmptyFileSend(self):
5249        address = self.serv.getsockname()
5250        filename = support.TESTFN + "2"
5251        with open(filename, 'wb'):
5252            self.addCleanup(support.unlink, filename)
5253        file = open(filename, 'rb')
5254        with socket.create_connection(address) as sock, file as file:
5255            meth = self.meth_from_sock(sock)
5256            sent = meth(file)
5257            self.assertEqual(sent, 0)
5258            self.assertEqual(file.tell(), 0)
5259
5260    def testEmptyFileSend(self):
5261        conn = self.accept_conn()
5262        data = self.recv_data(conn)
5263        self.assertEqual(data, b"")
5264
5265    # offset
5266
5267    def _testOffset(self):
5268        address = self.serv.getsockname()
5269        file = open(support.TESTFN, 'rb')
5270        with socket.create_connection(address) as sock, file as file:
5271            meth = self.meth_from_sock(sock)
5272            sent = meth(file, offset=5000)
5273            self.assertEqual(sent, self.FILESIZE - 5000)
5274            self.assertEqual(file.tell(), self.FILESIZE)
5275
5276    def testOffset(self):
5277        conn = self.accept_conn()
5278        data = self.recv_data(conn)
5279        self.assertEqual(len(data), self.FILESIZE - 5000)
5280        self.assertEqual(data, self.FILEDATA[5000:])
5281
5282    # count
5283
5284    def _testCount(self):
5285        address = self.serv.getsockname()
5286        file = open(support.TESTFN, 'rb')
5287        with socket.create_connection(address, timeout=2) as sock, file as file:
5288            count = 5000007
5289            meth = self.meth_from_sock(sock)
5290            sent = meth(file, count=count)
5291            self.assertEqual(sent, count)
5292            self.assertEqual(file.tell(), count)
5293
5294    def testCount(self):
5295        count = 5000007
5296        conn = self.accept_conn()
5297        data = self.recv_data(conn)
5298        self.assertEqual(len(data), count)
5299        self.assertEqual(data, self.FILEDATA[:count])
5300
5301    # count small
5302
5303    def _testCountSmall(self):
5304        address = self.serv.getsockname()
5305        file = open(support.TESTFN, 'rb')
5306        with socket.create_connection(address, timeout=2) as sock, file as file:
5307            count = 1
5308            meth = self.meth_from_sock(sock)
5309            sent = meth(file, count=count)
5310            self.assertEqual(sent, count)
5311            self.assertEqual(file.tell(), count)
5312
5313    def testCountSmall(self):
5314        count = 1
5315        conn = self.accept_conn()
5316        data = self.recv_data(conn)
5317        self.assertEqual(len(data), count)
5318        self.assertEqual(data, self.FILEDATA[:count])
5319
5320    # count + offset
5321
5322    def _testCountWithOffset(self):
5323        address = self.serv.getsockname()
5324        file = open(support.TESTFN, 'rb')
5325        with socket.create_connection(address, timeout=2) as sock, file as file:
5326            count = 100007
5327            meth = self.meth_from_sock(sock)
5328            sent = meth(file, offset=2007, count=count)
5329            self.assertEqual(sent, count)
5330            self.assertEqual(file.tell(), count + 2007)
5331
5332    def testCountWithOffset(self):
5333        count = 100007
5334        conn = self.accept_conn()
5335        data = self.recv_data(conn)
5336        self.assertEqual(len(data), count)
5337        self.assertEqual(data, self.FILEDATA[2007:count+2007])
5338
5339    # non blocking sockets are not supposed to work
5340
5341    def _testNonBlocking(self):
5342        address = self.serv.getsockname()
5343        file = open(support.TESTFN, 'rb')
5344        with socket.create_connection(address) as sock, file as file:
5345            sock.setblocking(False)
5346            meth = self.meth_from_sock(sock)
5347            self.assertRaises(ValueError, meth, file)
5348            self.assertRaises(ValueError, sock.sendfile, file)
5349
5350    def testNonBlocking(self):
5351        conn = self.accept_conn()
5352        if conn.recv(8192):
5353            self.fail('was not supposed to receive any data')
5354
5355    # timeout (non-triggered)
5356
5357    def _testWithTimeout(self):
5358        address = self.serv.getsockname()
5359        file = open(support.TESTFN, 'rb')
5360        with socket.create_connection(address, timeout=2) as sock, file as file:
5361            meth = self.meth_from_sock(sock)
5362            sent = meth(file)
5363            self.assertEqual(sent, self.FILESIZE)
5364
5365    def testWithTimeout(self):
5366        conn = self.accept_conn()
5367        data = self.recv_data(conn)
5368        self.assertEqual(len(data), self.FILESIZE)
5369        self.assertEqual(data, self.FILEDATA)
5370
5371    # timeout (triggered)
5372
5373    def _testWithTimeoutTriggeredSend(self):
5374        address = self.serv.getsockname()
5375        with open(support.TESTFN, 'rb') as file:
5376            with socket.create_connection(address, timeout=0.01) as sock:
5377                meth = self.meth_from_sock(sock)
5378                self.assertRaises(socket.timeout, meth, file)
5379
5380    def testWithTimeoutTriggeredSend(self):
5381        conn = self.accept_conn()
5382        conn.recv(88192)
5383
5384    # errors
5385
5386    def _test_errors(self):
5387        pass
5388
5389    def test_errors(self):
5390        with open(support.TESTFN, 'rb') as file:
5391            with socket.socket(type=socket.SOCK_DGRAM) as s:
5392                meth = self.meth_from_sock(s)
5393                self.assertRaisesRegex(
5394                    ValueError, "SOCK_STREAM", meth, file)
5395        with open(support.TESTFN, 'rt') as file:
5396            with socket.socket() as s:
5397                meth = self.meth_from_sock(s)
5398                self.assertRaisesRegex(
5399                    ValueError, "binary mode", meth, file)
5400        with open(support.TESTFN, 'rb') as file:
5401            with socket.socket() as s:
5402                meth = self.meth_from_sock(s)
5403                self.assertRaisesRegex(TypeError, "positive integer",
5404                                       meth, file, count='2')
5405                self.assertRaisesRegex(TypeError, "positive integer",
5406                                       meth, file, count=0.1)
5407                self.assertRaisesRegex(ValueError, "positive integer",
5408                                       meth, file, count=0)
5409                self.assertRaisesRegex(ValueError, "positive integer",
5410                                       meth, file, count=-1)
5411
5412
5413@unittest.skipUnless(thread, 'Threading required for this test.')
5414@unittest.skipUnless(hasattr(os, "sendfile"),
5415                     'os.sendfile() required for this test.')
5416class SendfileUsingSendfileTest(SendfileUsingSendTest):
5417    """
5418    Test the sendfile() implementation of socket.sendfile().
5419    """
5420    def meth_from_sock(self, sock):
5421        return getattr(sock, "_sendfile_use_sendfile")
5422
5423
5424@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
5425class LinuxKernelCryptoAPI(unittest.TestCase):
5426    # tests for AF_ALG
5427    def create_alg(self, typ, name):
5428        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5429        try:
5430            sock.bind((typ, name))
5431        except FileNotFoundError as e:
5432            # type / algorithm is not available
5433            sock.close()
5434            raise unittest.SkipTest(str(e), typ, name)
5435        else:
5436            return sock
5437
5438    # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY,
5439    # at least on ppc64le architecture
5440    @support.requires_linux_version(4, 5)
5441    def test_sha256(self):
5442        expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
5443                                 "177a9cb410ff61f20015ad")
5444        with self.create_alg('hash', 'sha256') as algo:
5445            op, _ = algo.accept()
5446            with op:
5447                op.sendall(b"abc")
5448                self.assertEqual(op.recv(512), expected)
5449
5450            op, _ = algo.accept()
5451            with op:
5452                op.send(b'a', socket.MSG_MORE)
5453                op.send(b'b', socket.MSG_MORE)
5454                op.send(b'c', socket.MSG_MORE)
5455                op.send(b'')
5456                self.assertEqual(op.recv(512), expected)
5457
5458    def test_hmac_sha1(self):
5459        expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
5460        with self.create_alg('hash', 'hmac(sha1)') as algo:
5461            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
5462            op, _ = algo.accept()
5463            with op:
5464                op.sendall(b"what do ya want for nothing?")
5465                self.assertEqual(op.recv(512), expected)
5466
5467    # Although it should work with 3.19 and newer the test blocks on
5468    # Ubuntu 15.10 with Kernel 4.2.0-19.
5469    @support.requires_linux_version(4, 3)
5470    def test_aes_cbc(self):
5471        key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
5472        iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
5473        msg = b"Single block msg"
5474        ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
5475        msglen = len(msg)
5476        with self.create_alg('skcipher', 'cbc(aes)') as algo:
5477            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5478            op, _ = algo.accept()
5479            with op:
5480                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5481                                 flags=socket.MSG_MORE)
5482                op.sendall(msg)
5483                self.assertEqual(op.recv(msglen), ciphertext)
5484
5485            op, _ = algo.accept()
5486            with op:
5487                op.sendmsg_afalg([ciphertext],
5488                                 op=socket.ALG_OP_DECRYPT, iv=iv)
5489                self.assertEqual(op.recv(msglen), msg)
5490
5491            # long message
5492            multiplier = 1024
5493            longmsg = [msg] * multiplier
5494            op, _ = algo.accept()
5495            with op:
5496                op.sendmsg_afalg(longmsg,
5497                                 op=socket.ALG_OP_ENCRYPT, iv=iv)
5498                enc = op.recv(msglen * multiplier)
5499            self.assertEqual(len(enc), msglen * multiplier)
5500            self.assertEqual(enc[:msglen], ciphertext)
5501
5502            op, _ = algo.accept()
5503            with op:
5504                op.sendmsg_afalg([enc],
5505                                 op=socket.ALG_OP_DECRYPT, iv=iv)
5506                dec = op.recv(msglen * multiplier)
5507            self.assertEqual(len(dec), msglen * multiplier)
5508            self.assertEqual(dec, msg * multiplier)
5509
5510    @support.requires_linux_version(4, 9)  # see issue29324
5511    def test_aead_aes_gcm(self):
5512        key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
5513        iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
5514        plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
5515        assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
5516        expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
5517        expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
5518
5519        taglen = len(expected_tag)
5520        assoclen = len(assoc)
5521
5522        with self.create_alg('aead', 'gcm(aes)') as algo:
5523            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5524            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
5525                            None, taglen)
5526
5527            # send assoc, plain and tag buffer in separate steps
5528            op, _ = algo.accept()
5529            with op:
5530                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5531                                 assoclen=assoclen, flags=socket.MSG_MORE)
5532                op.sendall(assoc, socket.MSG_MORE)
5533                op.sendall(plain)
5534                res = op.recv(assoclen + len(plain) + taglen)
5535                self.assertEqual(expected_ct, res[assoclen:-taglen])
5536                self.assertEqual(expected_tag, res[-taglen:])
5537
5538            # now with msg
5539            op, _ = algo.accept()
5540            with op:
5541                msg = assoc + plain
5542                op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
5543                                 assoclen=assoclen)
5544                res = op.recv(assoclen + len(plain) + taglen)
5545                self.assertEqual(expected_ct, res[assoclen:-taglen])
5546                self.assertEqual(expected_tag, res[-taglen:])
5547
5548            # create anc data manually
5549            pack_uint32 = struct.Struct('I').pack
5550            op, _ = algo.accept()
5551            with op:
5552                msg = assoc + plain
5553                op.sendmsg(
5554                    [msg],
5555                    ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
5556                     [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
5557                     [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
5558                    )
5559                )
5560                res = op.recv(len(msg) + taglen)
5561                self.assertEqual(expected_ct, res[assoclen:-taglen])
5562                self.assertEqual(expected_tag, res[-taglen:])
5563
5564            # decrypt and verify
5565            op, _ = algo.accept()
5566            with op:
5567                msg = assoc + expected_ct + expected_tag
5568                op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
5569                                 assoclen=assoclen)
5570                res = op.recv(len(msg) - taglen)
5571                self.assertEqual(plain, res[assoclen:])
5572
5573    @support.requires_linux_version(4, 3)  # see test_aes_cbc
5574    def test_drbg_pr_sha256(self):
5575        # deterministic random bit generator, prediction resistance, sha256
5576        with self.create_alg('rng', 'drbg_pr_sha256') as algo:
5577            extra_seed = os.urandom(32)
5578            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
5579            op, _ = algo.accept()
5580            with op:
5581                rn = op.recv(32)
5582                self.assertEqual(len(rn), 32)
5583
5584    def test_sendmsg_afalg_args(self):
5585        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5586        with sock:
5587            with self.assertRaises(TypeError):
5588                sock.sendmsg_afalg()
5589
5590            with self.assertRaises(TypeError):
5591                sock.sendmsg_afalg(op=None)
5592
5593            with self.assertRaises(TypeError):
5594                sock.sendmsg_afalg(1)
5595
5596            with self.assertRaises(TypeError):
5597                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
5598
5599            with self.assertRaises(TypeError):
5600                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
5601
5602    def test_length_restriction(self):
5603        # bpo-35050, off-by-one error in length check
5604        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5605        self.addCleanup(sock.close)
5606
5607        # salg_type[14]
5608        with self.assertRaises(FileNotFoundError):
5609            sock.bind(("t" * 13, "name"))
5610        with self.assertRaisesRegex(ValueError, "type too long"):
5611            sock.bind(("t" * 14, "name"))
5612
5613        # salg_name[64]
5614        with self.assertRaises(FileNotFoundError):
5615            sock.bind(("type", "n" * 63))
5616        with self.assertRaisesRegex(ValueError, "name too long"):
5617            sock.bind(("type", "n" * 64))
5618
5619
5620@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
5621class TestMSWindowsTCPFlags(unittest.TestCase):
5622    knownTCPFlags = {
5623                       # available since long time ago
5624                       'TCP_MAXSEG',
5625                       'TCP_NODELAY',
5626                       # available starting with Windows 10 1607
5627                       'TCP_FASTOPEN',
5628                       # available starting with Windows 10 1703
5629                       'TCP_KEEPCNT',
5630                       }
5631
5632    def test_new_tcp_flags(self):
5633        provided = [s for s in dir(socket) if s.startswith('TCP')]
5634        unknown = [s for s in provided if s not in self.knownTCPFlags]
5635
5636        self.assertEqual([], unknown,
5637            "New TCP flags were discovered. See bpo-32394 for more information")
5638
5639def test_main():
5640    tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
5641             TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ]
5642
5643    tests.extend([
5644        NonBlockingTCPTests,
5645        FileObjectClassTestCase,
5646        UnbufferedFileObjectClassTestCase,
5647        LineBufferedFileObjectClassTestCase,
5648        SmallBufferedFileObjectClassTestCase,
5649        UnicodeReadFileObjectClassTestCase,
5650        UnicodeWriteFileObjectClassTestCase,
5651        UnicodeReadWriteFileObjectClassTestCase,
5652        NetworkConnectionNoServer,
5653        NetworkConnectionAttributesTest,
5654        NetworkConnectionBehaviourTest,
5655        ContextManagersTest,
5656        InheritanceTest,
5657        NonblockConstantTest
5658    ])
5659    tests.append(BasicSocketPairTest)
5660    tests.append(TestUnixDomain)
5661    tests.append(TestLinuxAbstractNamespace)
5662    tests.extend([TIPCTest, TIPCThreadableTest])
5663    tests.extend([BasicCANTest, CANTest])
5664    tests.extend([BasicRDSTest, RDSTest])
5665    tests.append(LinuxKernelCryptoAPI)
5666    tests.extend([
5667        CmsgMacroTests,
5668        SendmsgUDPTest,
5669        RecvmsgUDPTest,
5670        RecvmsgIntoUDPTest,
5671        SendmsgUDP6Test,
5672        RecvmsgUDP6Test,
5673        RecvmsgRFC3542AncillaryUDP6Test,
5674        RecvmsgIntoRFC3542AncillaryUDP6Test,
5675        RecvmsgIntoUDP6Test,
5676        SendmsgTCPTest,
5677        RecvmsgTCPTest,
5678        RecvmsgIntoTCPTest,
5679        SendmsgSCTPStreamTest,
5680        RecvmsgSCTPStreamTest,
5681        RecvmsgIntoSCTPStreamTest,
5682        SendmsgUnixStreamTest,
5683        RecvmsgUnixStreamTest,
5684        RecvmsgIntoUnixStreamTest,
5685        RecvmsgSCMRightsStreamTest,
5686        RecvmsgIntoSCMRightsStreamTest,
5687        # These are slow when setitimer() is not available
5688        InterruptedRecvTimeoutTest,
5689        InterruptedSendTimeoutTest,
5690        TestSocketSharing,
5691        SendfileUsingSendTest,
5692        SendfileUsingSendfileTest,
5693    ])
5694    tests.append(TestMSWindowsTCPFlags)
5695
5696    thread_info = support.threading_setup()
5697    support.run_unittest(*tests)
5698    support.threading_cleanup(*thread_info)
5699
5700if __name__ == "__main__":
5701    test_main()
5702